Skip to content

Commit c086665

Browse files
committed
When updating a multi cell collection element, if the update is rejected then the shared Row.Builder is not freed causing all future mutations to be rejected
patch by David Capwell; reviewed by Bernardo Botella Corbi, Caleb Rackliffe, Dmitry Konstantinov for CASSANDRA-21055
1 parent c4e1e2c commit c086665

File tree

23 files changed

+476
-93
lines changed

23 files changed

+476
-93
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* When updating a multi cell collection element, if the update is rejected then the shared Row.Builder is not freed causing all future mutations to be rejected (CASSANDRA-21055)
23
* Schema annotations escape validation on CREATE and ALTER DDL statements (CASSANDRA-21046)
34
* Calculate once and cache the result of ModificationStatement#requiresRead as a perf optimization (CASSANDRA-21040)
45
* Update system schema tables with new distributed keyspace on upgrade (CASSANDRA-20872)

src/java/org/apache/cassandra/cql3/UpdateParameters.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ public class UpdateParameters
6262
// Holds data for operations that require a read-before-write. Will be null otherwise.
6363
private final Map<DecoratedKey, Partition> prefetchedRows;
6464

65-
private Row.Builder staticBuilder;
66-
private Row.Builder regularBuilder;
67-
6865
// The builder currently in use. Will alias either staticBuilder or regularBuilder, which are themselves built lazily.
6966
private Row.Builder builder;
7067

@@ -108,20 +105,8 @@ public <V> void newRow(Clustering<V> clustering) throws InvalidRequestException
108105
throw new InvalidRequestException("Invalid empty or null value for column " + metadata.clusteringColumns().get(0).name);
109106
}
110107
}
111-
112-
if (clustering == Clustering.STATIC_CLUSTERING)
113-
{
114-
if (staticBuilder == null)
115-
staticBuilder = BTreeRow.pooledUnsortedBuilder();
116-
builder = staticBuilder;
117-
}
118-
else
119-
{
120-
if (regularBuilder == null)
121-
regularBuilder = BTreeRow.pooledUnsortedBuilder();
122-
builder = regularBuilder;
123-
}
124-
108+
assert builder == null : "newRow called without building the previous row";
109+
builder = BTreeRow.pooledUnsortedBuilder();
125110
builder.newRow(clustering);
126111
}
127112

src/java/org/apache/cassandra/utils/caching/TinyThreadLocalPool.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
package org.apache.cassandra.utils.caching;
2020

21+
import accord.utils.Invariants;
2122
import io.netty.util.concurrent.FastThreadLocal;
2223

2324
public class TinyThreadLocalPool<V> extends FastThreadLocal<TinyThreadLocalPool.TinyPool<V>>
2425
{
26+
private static final boolean DEBUG = Invariants.debug();
2527
protected TinyPool<V> initialValue()
2628
{
2729
return new TinyPool<>();
@@ -46,10 +48,23 @@ public void offer(V value)
4648
}
4749
private void offerSafe(V value)
4850
{
51+
if (DEBUG)
52+
checkOfferSafe(value);
4953
if (val1 == null) val1 = value;
5054
else if (val2 == null) val2 = value;
5155
else if (val3 == null) val3 = value;
5256
}
57+
58+
private void checkOfferSafe(V value)
59+
{
60+
if (val1 == value)
61+
throw new IllegalStateException("Double offer");
62+
if (val2 == value)
63+
throw new IllegalStateException("Double offer");
64+
if (val3 == value)
65+
throw new IllegalStateException("Double offer");
66+
}
67+
5368
public V poll()
5469
{
5570
Object result;

test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@
9494
// checkstyle: suppress below 'blockSystemPropertyUsage'
9595
public class TestBaseImpl extends DistributedTestBase
9696
{
97+
static
98+
{
99+
System.setProperty("accord.debug", "true"); // checkstyle: suppress nearby 'blockSystemPropertyUsage'
100+
}
101+
97102
private static final Logger logger = LoggerFactory.getLogger(TestBaseImpl.class);
98103

99104
public static final Object[][] EMPTY_ROWS = new Object[0][];

test/distributed/org/apache/cassandra/distributed/test/cql3/AccordInteropMultiNodeTableWalkBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ protected void createTable(TableMetadata metadata)
117117
return super.command(rs, mutation, annotate);
118118
}
119119

120+
@Override
121+
protected boolean allowListElementAccessForUpdateSet()
122+
{
123+
// See CASSANDRA-20828
124+
return false;
125+
}
126+
120127
@Override
121128
protected boolean allowUsingTimestamp()
122129
{

test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ private State(RandomSource rs, Cluster cluster)
7575
super(rs, cluster);
7676
}
7777

78+
@Override
79+
protected boolean allowUsingTimestamp()
80+
{
81+
// Paxos doesn't allow USING TIMESTAMP
82+
return false;
83+
}
84+
7885
@Override
7986
protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder mutationGenBuilder)
8087
{

test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ public State(RandomSource rs, Cluster cluster)
447447
ASTGenerators.MutationGenBuilder mutationGenBuilder = new ASTGenerators.MutationGenBuilder(metadata)
448448
.withTxnSafe()
449449
.withColumnExpressions(e -> e.withOperators(Generators.fromGen(BOOLEAN_DISTRIBUTION.next(rs))))
450+
.withListElementAccessForUpdateSet(allowListElementAccessForUpdateSet())
450451
.withIgnoreIssues(IGNORED_ISSUES);
451452

452453
// Run the test with and without bound partitions

test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ class State extends CommonState
386386
this.mutationGen = toGen(new ASTGenerators.MutationGenBuilder(metadata)
387387
.withTxnSafe()
388388
.withPartitions(SourceDSL.arbitrary().pick(uniquePartitions))
389+
.withListElementAccessForUpdateSet(allowListElementAccessForUpdateSet())
389390
.withIgnoreIssues(IGNORED_ISSUES)
390391
.build());
391392
}

test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.function.BiConsumer;
2929
import java.util.function.Consumer;
30+
import java.util.function.Supplier;
3031
import java.util.stream.Collectors;
3132
import java.util.stream.Stream;
3233
import javax.annotation.Nullable;
3334

3435
import com.google.common.collect.ImmutableList;
36+
import org.assertj.core.api.ThrowableAssert;
3537
import org.slf4j.Logger;
3638

3739
import accord.utils.Gen;
@@ -105,6 +107,11 @@ public class StatefulASTBase extends TestBaseImpl
105107
*/
106108
protected static boolean CQL_DEBUG_APPLY_OPERATOR = false;
107109

110+
/**
111+
* Allows for overriding the CQL format logic, default is none (single line), but can create custom ones to help with the history
112+
*/
113+
protected static Supplier<CQLFormatter> CQL_FORMATTER = () -> CQLFormatter.None.instance;
114+
108115
protected static final Gen<Gen<Boolean>> BOOL_DISTRIBUTION = Gens.bools().mixedDistribution();
109116
protected static final Gen<Gen<Conditional.Where.Inequality>> LESS_THAN_DISTRO = Gens.mixedDistribution(Stream.of(Conditional.Where.Inequality.values())
110117
.filter(i -> i == Conditional.Where.Inequality.LESS_THAN || i == Conditional.Where.Inequality.LESS_THAN_EQ)
@@ -196,7 +203,7 @@ protected <S extends BaseState> Property.StatefulSuccess<S, Void> onSuccess(Logg
196203
protected static <S extends BaseState> Property.Command<S, Void, ?> compactTable(RandomSource rs, S state)
197204
{
198205
return new Property.SimpleCommand<>("nodetool compact " + state.metadata.keyspace + ' ' + state.metadata.name, s2 -> {
199-
state.cluster.forEach(i -> i.nodetoolResult("compact", s2.metadata.keyspace, s2.metadata.name).asserts().success());
206+
s2.cluster.forEach(i -> i.nodetoolResult("compact", s2.metadata.keyspace, s2.metadata.name).asserts().success());
200207
s2.compact();
201208
});
202209
}
@@ -475,6 +482,12 @@ private String createKeyspaceCQL(String ks)
475482
return command(rs, select, null);
476483
}
477484

485+
protected boolean allowListElementAccessForUpdateSet()
486+
{
487+
// this requires a read at the same CL, but the model is global level and not per-node level, so can't handle
488+
return mutationCl() != ConsistencyLevel.NODE_LOCAL;
489+
}
490+
478491
protected boolean allowRepair()
479492
{
480493
return false;
@@ -586,18 +599,24 @@ protected ConsistencyLevel mutationCl()
586599
{
587600
var inst = selectInstance(rs);
588601
String postfix = "on " + inst;
589-
if (mutation.isCas())
590-
{
602+
@Nullable
603+
Consumer<ThrowableAssert.ThrowingCallable> shouldRaiseThrowable = model.shouldReject(mutation);
604+
if (shouldRaiseThrowable != null)
605+
postfix += ", should reject";
606+
else if (mutation.isCas())
591607
postfix += ", would apply " + model.shouldApply(mutation);
592-
// CAS doesn't allow timestamps
593-
mutation = mutation.withoutTimestamp();
594-
}
608+
595609
if (annotate == null) annotate = postfix;
596610
else annotate += ", " + postfix;
597-
Mutation finalMutation = mutation;
598611
return new Property.SimpleCommand<>(humanReadable(mutation, annotate), s -> {
599-
var result = s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), finalMutation);
600-
s.model.updateAndValidate(result, finalMutation);
612+
if (shouldRaiseThrowable != null)
613+
{
614+
shouldRaiseThrowable.accept(() -> s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), mutation));
615+
s.mutation();
616+
return;
617+
}
618+
var result = s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), mutation);
619+
s.model.updateAndValidate(result, mutation);
601620
s.mutation();
602621
});
603622
}
@@ -611,15 +630,26 @@ protected ConsistencyLevel mutationCl()
611630
{
612631
var inst = selectInstance(rs);
613632
String postfix = "on " + inst;
614-
if (model.isConditional(txn))
633+
@Nullable
634+
Consumer<ThrowableAssert.ThrowingCallable> shouldRaiseThrowable = model.shouldReject(txn);
635+
if (shouldRaiseThrowable != null)
636+
postfix += ", should reject";
637+
else if (model.isConditional(txn))
615638
postfix += ", would apply " + model.shouldApply(txn);
616639
if (annotate == null) annotate = postfix;
617640
else annotate += ", " + postfix;
618641

619642
return new Property.SimpleCommand<>(humanReadable(txn, annotate), s -> {
620643
boolean hasMutation = txn.ifBlock.isPresent() || !txn.mutations.isEmpty();
621644
ConsistencyLevel cl = hasMutation ? s.mutationCl() : s.selectCl();
622-
s.model.updateAndValidate(s.executeQuery(inst, Integer.MAX_VALUE, cl, txn), txn);
645+
if (shouldRaiseThrowable != null)
646+
{
647+
shouldRaiseThrowable.accept(() -> s.executeQuery(inst, Integer.MAX_VALUE, cl, txn));
648+
}
649+
else
650+
{
651+
s.model.updateAndValidate(s.executeQuery(inst, Integer.MAX_VALUE, cl, txn), txn);
652+
}
623653
if (hasMutation)
624654
s.mutation();
625655
});
@@ -706,7 +736,7 @@ protected String humanReadable(Statement stmt, @Nullable String annotate)
706736
{
707737
// With UTF-8 some chars can cause printing issues leading to error messages that don't reproduce the original issue.
708738
// To avoid this problem, always escape the CQL so nothing gets lost
709-
String cql = StringUtils.escapeControlChars(stmt.visit(debug).toCQL(CQLFormatter.None.instance));
739+
String cql = StringUtils.escapeControlChars(stmt.visit(debug).toCQL(CQL_FORMATTER.get()));
710740
if (annotate != null)
711741
cql += " -- " + annotate;
712742
return cql;

test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,11 @@ private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
148148
private static CommandGen<Spec> cqlOperations(Spec spec)
149149
{
150150
Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new ASTGenerators.SelectGenBuilder(spec.metadata).withLimit1().build());
151-
Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new ASTGenerators.MutationGenBuilder(spec.metadata).withTxnSafe().disallowUpdateMultiplePartitionKeys().build());
151+
Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new ASTGenerators.MutationGenBuilder(spec.metadata)
152+
.withTxnSafe()
153+
.disallowUpdateMultiplePartitionKeys() //TODO (coverage): this is something Accord should support, so should remove and make sure accord is updated
154+
.disallowListElementAccessForUpdateSet() //TODO (coverage): CASSANDRA-20828 found an issue with multi cell list type timestamp handling, so make sure accord doesn't hit this
155+
.build());
152156
Gen<Statement> txn = (Gen<Statement>) (Gen<?>) fromQT(new ASTGenerators.TxnGenBuilder(spec.metadata).build());
153157
Map<Gen<Statement>, Integer> operations = new LinkedHashMap<>();
154158
operations.put(select, 1);

0 commit comments

Comments
 (0)