Skip to content

Commit 7f59280

Browse files
committed
Merge branch 'cassandra-4.0' into cassandra-4.1
* cassandra-4.0: Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting
2 parents ed7d584 + 8d2c11e commit 7f59280

File tree

6 files changed

+250
-2
lines changed

6 files changed

+250
-2
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* ReadCommandController should close fast to avoid deadlock when building secondary index (CASSANDRA-19564)
33
* Redact security-sensitive information in system_views.settings (CASSANDRA-20856)
44
Merged from 4.0:
5+
* Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting (CASSANDRA-20877)
56
* Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753)
67
* Leveled Compaction doesn't validate maxBytesForLevel when the table is altered/created (CASSANDRA-20570)
78
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)

src/java/org/apache/cassandra/dht/Range.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,21 @@ public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Rang
352352
return result;
353353
}
354354

355+
public static <T extends RingPosition<T>> List<Range<T>> intersect(Collection<Range<T>> ranges1, Collection<Range<T>> ranges2)
356+
{
357+
Set<Range<T>> result = new HashSet<>();
358+
// note: O(n^2), simple but not very efficient
359+
for (Range<T> range1 : ranges1)
360+
{
361+
for (Range<T> range2 : ranges2)
362+
{
363+
result.addAll(range1.intersectionWith(range2));
364+
}
365+
}
366+
return normalize(result);
367+
}
368+
369+
355370
/**
356371
* Calculate set of the difference ranges of given two ranges
357372
* (as current (A, B] and rhs is (C, D])

src/java/org/apache/cassandra/repair/consistent/LocalSessions.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.apache.cassandra.repair.messages.StatusRequest;
9090
import org.apache.cassandra.repair.messages.StatusResponse;
9191
import org.apache.cassandra.schema.TableId;
92+
import org.apache.cassandra.schema.TableMetadata;
9293
import org.apache.cassandra.service.ActiveRepairService;
9394
import org.apache.cassandra.repair.NoSuchRepairSessionException;
9495
import org.apache.cassandra.service.StorageService;
@@ -241,21 +242,46 @@ private boolean shouldStoreSession(LocalSession session)
241242
*/
242243
private boolean isSuperseded(LocalSession session)
243244
{
245+
// to reduce overheads of intersect calculation for tables within the same keyspace
246+
Map<String, Collection<Range<Token>>> rangesPerKeyspaceCache = new HashMap<>();
244247
for (TableId tid : session.tableIds)
245248
{
246-
RepairedState state = repairedStates.get(tid);
249+
TableMetadata tableMetadata = getTableMetadata(tid);
250+
if (tableMetadata == null) // if a table was removed - ignore it
251+
continue;
247252

253+
RepairedState state = repairedStates.get(tid);
248254
if (state == null)
249255
return false;
250256

251-
long minRepaired = state.minRepairedAt(session.ranges);
257+
Collection<Range<Token>> actualRanges = rangesPerKeyspaceCache.computeIfAbsent(tableMetadata.keyspace, (keyspace) -> {
258+
List<Range<Token>> localRanges = getLocalRanges(tableMetadata.keyspace);
259+
if (localRanges.isEmpty()) // to handle the case when we run before the information about owned ranges is properly populated
260+
return session.ranges;
261+
262+
// ignore token ranges which were moved to other nodes and not owned by the current one anymore
263+
return Range.intersect(session.ranges, localRanges);
264+
});
265+
long minRepaired = state.minRepairedAt(actualRanges);
252266
if (minRepaired <= session.repairedAt)
253267
return false;
254268
}
255269

256270
return true;
257271
}
258272

273+
@VisibleForTesting
274+
protected TableMetadata getTableMetadata(TableId tableId)
275+
{
276+
return Schema.instance.getTableMetadata(tableId);
277+
}
278+
279+
@VisibleForTesting
280+
protected List<Range<Token>> getLocalRanges(String keyspace)
281+
{
282+
return StorageService.instance.getLocalAndPendingRanges(keyspace);
283+
}
284+
259285
public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
260286
{
261287
RepairedState state = repairedStates.get(tid);
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableMap;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import org.apache.cassandra.distributed.Cluster;
30+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
31+
import org.apache.cassandra.distributed.api.ICluster;
32+
import org.apache.cassandra.distributed.api.IInstanceConfig;
33+
import org.apache.cassandra.distributed.api.IInvokableInstance;
34+
import org.apache.cassandra.distributed.api.TokenSupplier;
35+
import org.apache.cassandra.distributed.shared.NetworkTopology;
36+
import org.apache.cassandra.distributed.shared.WithProperties;
37+
import org.apache.cassandra.repair.consistent.ConsistentSession;
38+
import org.apache.cassandra.service.ActiveRepairService;
39+
import org.apache.cassandra.service.StorageService;
40+
import org.apache.cassandra.utils.concurrent.Condition;
41+
import org.apache.cassandra.utils.progress.ProgressEventType;
42+
43+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44+
import static java.util.concurrent.TimeUnit.MINUTES;
45+
import static java.util.concurrent.TimeUnit.SECONDS;
46+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
47+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
48+
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
49+
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
50+
import static org.apache.cassandra.repair.consistent.LocalSessionInfo.STATE;
51+
import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
52+
import static org.awaitility.Awaitility.await;
53+
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
54+
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
55+
56+
public class IncrementalRepairCleanupAfterNodeAddingTest extends TestBaseImpl
57+
{
58+
@Test
59+
public void test() throws Exception
60+
{
61+
int originalNodeCount = 3;
62+
try (WithProperties withProperties = new WithProperties())
63+
{
64+
withProperties.setProperty("cassandra.repair_delete_timeout_seconds", "0");
65+
try (Cluster cluster = builder().withNodes(originalNodeCount)
66+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(originalNodeCount + 1, 1))
67+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
68+
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
69+
.start())
70+
{
71+
populate(cluster, 0, 100);
72+
73+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
74+
75+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
76+
77+
// to check that the session is still here (it is not superseded yet)
78+
cluster.get(1).runOnInstance(rethrow(() -> {
79+
ActiveRepairService.instance.consistent.local.cleanup();
80+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
81+
Assert.assertThat(sessions, hasSize(1));
82+
}));
83+
84+
addNode(cluster);
85+
86+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
87+
88+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
89+
90+
cluster.get(1).runOnInstance(rethrow(() -> {
91+
ActiveRepairService.instance.consistent.local.cleanup();
92+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
93+
Assert.assertThat(sessions, hasSize(1));
94+
}));
95+
}
96+
}
97+
}
98+
99+
protected void addNode(Cluster cluster)
100+
{
101+
IInstanceConfig config = cluster.newInstanceConfig();
102+
config.set("auto_bootstrap", true);
103+
IInvokableInstance newInstance = cluster.bootstrap(config);
104+
newInstance.startup(cluster);
105+
}
106+
107+
public static void populate(ICluster cluster, int from, int to)
108+
{
109+
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
110+
}
111+
112+
public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
113+
{
114+
cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"));
115+
cluster.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s.repair_add_node_test (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
116+
for (int i = from; i < to; i++)
117+
{
118+
cluster.coordinator(coord).execute(withKeyspace("INSERT INTO %s.repair_add_node_test (pk, ck, v) VALUES (?, ?, ?)"),
119+
cl, i, i, i);
120+
}
121+
}
122+
123+
static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
124+
{
125+
cluster.get(1).runOnInstance(rethrow(() -> {
126+
Condition await = newOneTimeCondition();
127+
StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
128+
if (event.getType() == ProgressEventType.COMPLETE)
129+
await.signalAll();
130+
})).right.get();
131+
await.await(1L, MINUTES);
132+
133+
// local sessions finalization happens asynchronously
134+
// so to avoid race condition and flakiness for the test we wait explicitly for local sessions to finalize
135+
await().pollInterval(10, MILLISECONDS)
136+
.atMost(60, SECONDS)
137+
.until(() -> {
138+
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
139+
for (Map<String, String> sessionInfo : sessions)
140+
if (!sessionInfo.get(STATE).equals(ConsistentSession.State.FINALIZED.toString()))
141+
return false;
142+
return true;
143+
});
144+
}));
145+
}
146+
}

test/unit/org/apache/cassandra/dht/RangeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,4 +736,48 @@ public void testGroupSubtract()
736736
assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
737737
assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
738738
}
739+
740+
@Test
741+
public void testGroupIntersection()
742+
{
743+
assertEquals(Collections.emptyList(),
744+
Range.intersect(asList(r(1, 5), r(10, 15)),
745+
asList(r(6, 7), r(20, 25))
746+
));
747+
748+
assertEquals(asList(r(5, 6)),
749+
Range.intersect(asList(r(1, 6), r(10, 15)),
750+
asList(r(5, 10))
751+
));
752+
753+
assertEquals(asList(r(5, 6), r(10, 11)),
754+
Range.intersect(asList(r(1, 6), r(10, 15)),
755+
asList(r(5, 11))
756+
));
757+
758+
assertEquals(asList(r(5, 6), r(10, 11)),
759+
Range.intersect(asList(r(1, 6), r(10, 15)),
760+
asList(r(5, 11))
761+
));
762+
763+
assertEquals(asList(r(5, 6), r(10, 11), r(12, 15)),
764+
Range.intersect(asList(r(1, 6), r(10, 15)),
765+
asList(r(5, 11), r(12, 20))
766+
));
767+
768+
assertEquals(asList(r(5, 6), r(10, 15)),
769+
Range.intersect(asList(r(1, 6), r(10, 15)),
770+
asList(r(5, 11), r(11, 20))
771+
));
772+
773+
assertEquals(Collections.emptyList(),
774+
Range.intersect(Collections.emptyList(),
775+
asList(r(5, 11), r(11, 20))
776+
));
777+
778+
assertEquals(Collections.emptyList(),
779+
Range.intersect(asList(r(1, 6), r(10, 15)),
780+
Collections.emptyList()
781+
));
782+
}
739783
}

test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.cassandra.repair.consistent;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
@@ -42,11 +43,14 @@
4243
import org.apache.cassandra.cql3.QueryProcessor;
4344
import org.apache.cassandra.cql3.UntypedResultSet;
4445
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
46+
import org.apache.cassandra.dht.Range;
47+
import org.apache.cassandra.dht.Token;
4548
import org.apache.cassandra.locator.RangesAtEndpoint;
4649
import org.apache.cassandra.net.Message;
4750
import org.apache.cassandra.repair.AbstractRepairTest;
4851
import org.apache.cassandra.locator.InetAddressAndPort;
4952
import org.apache.cassandra.repair.KeyspaceRepairManager;
53+
import org.apache.cassandra.schema.TableId;
5054
import org.apache.cassandra.schema.TableMetadata;
5155
import org.apache.cassandra.schema.Schema;
5256
import org.apache.cassandra.schema.SchemaConstants;
@@ -208,6 +212,18 @@ protected boolean sessionHasData(LocalSession session)
208212
{
209213
return sessionHasData;
210214
}
215+
216+
@Override
217+
protected TableMetadata getTableMetadata(TableId tableId)
218+
{
219+
return cfm;
220+
}
221+
222+
@Override
223+
protected List<Range<Token>> getLocalRanges(String keyspace)
224+
{
225+
return Arrays.asList(RANGE1, RANGE2, RANGE3);
226+
}
211227
}
212228

213229
private static TableMetadata cfm;

0 commit comments

Comments
 (0)