Skip to content

Commit 8d2c11e

Browse files
Dmitry Konstantinovnetudima
authored andcommitted
Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting
Patch by Dmitry Konstantinov; reviewed by Marcus Eriksson, Jaydeepkumar Chovatia for CASSANDRA-20877
1 parent 3f079b2 commit 8d2c11e

File tree

6 files changed

+249
-2
lines changed

6 files changed

+249
-2
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
4.0.20
2+
* Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting (CASSANDRA-20877)
23
* Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753)
34
* Leveled Compaction doesn't validate maxBytesForLevel when the table is altered/created (CASSANDRA-20570)
45
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)

src/java/org/apache/cassandra/dht/Range.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,21 @@ public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Rang
352352
return result;
353353
}
354354

355+
public static <T extends RingPosition<T>> List<Range<T>> intersect(Collection<Range<T>> ranges1, Collection<Range<T>> ranges2)
356+
{
357+
Set<Range<T>> result = new HashSet<>();
358+
// note: O(n^2), simple but not very efficient
359+
for (Range<T> range1 : ranges1)
360+
{
361+
for (Range<T> range2 : ranges2)
362+
{
363+
result.addAll(range1.intersectionWith(range2));
364+
}
365+
}
366+
return normalize(result);
367+
}
368+
369+
355370
/**
356371
* Calculate set of the difference ranges of given two ranges
357372
* (as current (A, B] and rhs is (C, D])

src/java/org/apache/cassandra/repair/consistent/LocalSessions.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import org.apache.cassandra.repair.messages.StatusRequest;
9393
import org.apache.cassandra.repair.messages.StatusResponse;
9494
import org.apache.cassandra.schema.TableId;
95+
import org.apache.cassandra.schema.TableMetadata;
9596
import org.apache.cassandra.service.ActiveRepairService;
9697
import org.apache.cassandra.service.StorageService;
9798
import org.apache.cassandra.utils.FBUtilities;
@@ -239,21 +240,46 @@ private boolean shouldStoreSession(LocalSession session)
239240
*/
240241
private boolean isSuperseded(LocalSession session)
241242
{
243+
// to reduce overheads of intersect calculation for tables within the same keyspace
244+
Map<String, Collection<Range<Token>>> rangesPerKeyspaceCache = new HashMap<>();
242245
for (TableId tid : session.tableIds)
243246
{
244-
RepairedState state = repairedStates.get(tid);
247+
TableMetadata tableMetadata = getTableMetadata(tid);
248+
if (tableMetadata == null) // if a table was removed - ignore it
249+
continue;
245250

251+
RepairedState state = repairedStates.get(tid);
246252
if (state == null)
247253
return false;
248254

249-
long minRepaired = state.minRepairedAt(session.ranges);
255+
Collection<Range<Token>> actualRanges = rangesPerKeyspaceCache.computeIfAbsent(tableMetadata.keyspace, (keyspace) -> {
256+
List<Range<Token>> localRanges = getLocalRanges(tableMetadata.keyspace);
257+
if (localRanges.isEmpty()) // to handle the case when we run before the information about owned ranges is properly populated
258+
return session.ranges;
259+
260+
// ignore token ranges which were moved to other nodes and not owned by the current one anymore
261+
return Range.intersect(session.ranges, localRanges);
262+
});
263+
long minRepaired = state.minRepairedAt(actualRanges);
250264
if (minRepaired <= session.repairedAt)
251265
return false;
252266
}
253267

254268
return true;
255269
}
256270

271+
@VisibleForTesting
272+
protected TableMetadata getTableMetadata(TableId tableId)
273+
{
274+
return Schema.instance.getTableMetadata(tableId);
275+
}
276+
277+
@VisibleForTesting
278+
protected List<Range<Token>> getLocalRanges(String keyspace)
279+
{
280+
return StorageService.instance.getLocalAndPendingRanges(keyspace);
281+
}
282+
257283
public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
258284
{
259285
RepairedState state = repairedStates.get(tid);
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableMap;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import org.apache.cassandra.distributed.Cluster;
30+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
31+
import org.apache.cassandra.distributed.api.ICluster;
32+
import org.apache.cassandra.distributed.api.IInstanceConfig;
33+
import org.apache.cassandra.distributed.api.IInvokableInstance;
34+
import org.apache.cassandra.distributed.api.TokenSupplier;
35+
import org.apache.cassandra.distributed.shared.NetworkTopology;
36+
import org.apache.cassandra.distributed.shared.WithProperties;
37+
import org.apache.cassandra.repair.consistent.ConsistentSession;
38+
import org.apache.cassandra.service.ActiveRepairService;
39+
import org.apache.cassandra.service.StorageService;
40+
import org.apache.cassandra.utils.concurrent.SimpleCondition;
41+
import org.apache.cassandra.utils.progress.ProgressEventType;
42+
43+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44+
import static java.util.concurrent.TimeUnit.MINUTES;
45+
import static java.util.concurrent.TimeUnit.SECONDS;
46+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
47+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
48+
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
49+
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
50+
import static org.apache.cassandra.repair.consistent.LocalSessionInfo.STATE;
51+
import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
52+
import static org.awaitility.Awaitility.await;
53+
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
54+
55+
public class IncrementalRepairCleanupAfterNodeAddingTest extends TestBaseImpl
56+
{
57+
@Test
58+
public void test() throws Exception
59+
{
60+
int originalNodeCount = 3;
61+
try (WithProperties withProperties = new WithProperties())
62+
{
63+
withProperties.setProperty("cassandra.repair_delete_timeout_seconds", "0");
64+
try (Cluster cluster = builder().withNodes(originalNodeCount)
65+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(originalNodeCount + 1, 1))
66+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
67+
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
68+
.start())
69+
{
70+
populate(cluster, 0, 100);
71+
72+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
73+
74+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
75+
76+
// to check that the session is still here (it is not superseded yet)
77+
cluster.get(1).runOnInstance(rethrow(() -> {
78+
ActiveRepairService.instance.consistent.local.cleanup();
79+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
80+
Assert.assertThat(sessions, hasSize(1));
81+
}));
82+
83+
addNode(cluster);
84+
85+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
86+
87+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
88+
89+
cluster.get(1).runOnInstance(rethrow(() -> {
90+
ActiveRepairService.instance.consistent.local.cleanup();
91+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
92+
Assert.assertThat(sessions, hasSize(1));
93+
}));
94+
}
95+
}
96+
}
97+
98+
protected void addNode(Cluster cluster)
99+
{
100+
IInstanceConfig config = cluster.newInstanceConfig();
101+
config.set("auto_bootstrap", true);
102+
IInvokableInstance newInstance = cluster.bootstrap(config);
103+
newInstance.startup(cluster);
104+
}
105+
106+
public static void populate(ICluster cluster, int from, int to)
107+
{
108+
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
109+
}
110+
111+
public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
112+
{
113+
cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"));
114+
cluster.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s.repair_add_node_test (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
115+
for (int i = from; i < to; i++)
116+
{
117+
cluster.coordinator(coord).execute(withKeyspace("INSERT INTO %s.repair_add_node_test (pk, ck, v) VALUES (?, ?, ?)"),
118+
cl, i, i, i);
119+
}
120+
}
121+
122+
static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
123+
{
124+
cluster.get(1).runOnInstance(rethrow(() -> {
125+
SimpleCondition await = new SimpleCondition();
126+
StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
127+
if (event.getType() == ProgressEventType.COMPLETE)
128+
await.signalAll();
129+
})).right.get();
130+
await.await(1L, MINUTES);
131+
132+
// local sessions finalization happens asynchronously
133+
// so to avoid race condition and flakiness for the test we wait explicitly for local sessions to finalize
134+
await().pollInterval(10, MILLISECONDS)
135+
.atMost(60, SECONDS)
136+
.until(() -> {
137+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
138+
for (Map<String, String> sessionInfo : sessions)
139+
if (!sessionInfo.get(STATE).equals(ConsistentSession.State.FINALIZED.toString()))
140+
return false;
141+
return true;
142+
});
143+
}));
144+
}
145+
}

test/unit/org/apache/cassandra/dht/RangeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,4 +737,48 @@ public void testGroupSubtract()
737737
assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
738738
assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
739739
}
740+
741+
@Test
742+
public void testGroupIntersection()
743+
{
744+
assertEquals(Collections.emptyList(),
745+
Range.intersect(asList(r(1, 5), r(10, 15)),
746+
asList(r(6, 7), r(20, 25))
747+
));
748+
749+
assertEquals(asList(r(5, 6)),
750+
Range.intersect(asList(r(1, 6), r(10, 15)),
751+
asList(r(5, 10))
752+
));
753+
754+
assertEquals(asList(r(5, 6), r(10, 11)),
755+
Range.intersect(asList(r(1, 6), r(10, 15)),
756+
asList(r(5, 11))
757+
));
758+
759+
assertEquals(asList(r(5, 6), r(10, 11)),
760+
Range.intersect(asList(r(1, 6), r(10, 15)),
761+
asList(r(5, 11))
762+
));
763+
764+
assertEquals(asList(r(5, 6), r(10, 11), r(12, 15)),
765+
Range.intersect(asList(r(1, 6), r(10, 15)),
766+
asList(r(5, 11), r(12, 20))
767+
));
768+
769+
assertEquals(asList(r(5, 6), r(10, 15)),
770+
Range.intersect(asList(r(1, 6), r(10, 15)),
771+
asList(r(5, 11), r(11, 20))
772+
));
773+
774+
assertEquals(Collections.emptyList(),
775+
Range.intersect(Collections.emptyList(),
776+
asList(r(5, 11), r(11, 20))
777+
));
778+
779+
assertEquals(Collections.emptyList(),
780+
Range.intersect(asList(r(1, 6), r(10, 15)),
781+
Collections.emptyList()
782+
));
783+
}
740784
}

test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.cassandra.repair.consistent;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
@@ -44,11 +45,14 @@
4445
import org.apache.cassandra.cql3.QueryProcessor;
4546
import org.apache.cassandra.cql3.UntypedResultSet;
4647
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
48+
import org.apache.cassandra.dht.Range;
49+
import org.apache.cassandra.dht.Token;
4750
import org.apache.cassandra.locator.RangesAtEndpoint;
4851
import org.apache.cassandra.net.Message;
4952
import org.apache.cassandra.repair.AbstractRepairTest;
5053
import org.apache.cassandra.locator.InetAddressAndPort;
5154
import org.apache.cassandra.repair.KeyspaceRepairManager;
55+
import org.apache.cassandra.schema.TableId;
5256
import org.apache.cassandra.schema.TableMetadata;
5357
import org.apache.cassandra.schema.Schema;
5458
import org.apache.cassandra.schema.SchemaConstants;
@@ -206,6 +210,18 @@ protected boolean sessionHasData(LocalSession session)
206210
{
207211
return sessionHasData;
208212
}
213+
214+
@Override
215+
protected TableMetadata getTableMetadata(TableId tableId)
216+
{
217+
return cfm;
218+
}
219+
220+
@Override
221+
protected List<Range<Token>> getLocalRanges(String keyspace)
222+
{
223+
return Arrays.asList(RANGE1, RANGE2, RANGE3);
224+
}
209225
}
210226

211227
private static TableMetadata cfm;

0 commit comments

Comments
 (0)