|
18 | 18 | package org.apache.cassandra.tcm; |
19 | 19 |
|
20 | 20 | import java.io.IOException; |
| 21 | +import java.util.ArrayList; |
21 | 22 | import java.util.Collections; |
22 | 23 | import java.util.HashSet; |
| 24 | +import java.util.List; |
23 | 25 | import java.util.Map; |
24 | 26 | import java.util.Objects; |
25 | 27 | import java.util.Optional; |
|
37 | 39 | import org.apache.cassandra.config.CassandraRelevantProperties; |
38 | 40 | import org.apache.cassandra.config.DatabaseDescriptor; |
39 | 41 | import org.apache.cassandra.db.ColumnFamilyStore; |
| 42 | +import org.apache.cassandra.db.Mutation; |
40 | 43 | import org.apache.cassandra.db.SystemKeyspace; |
41 | 44 | import org.apache.cassandra.db.commitlog.CommitLog; |
42 | 45 | import org.apache.cassandra.dht.BootStrapper; |
|
49 | 52 | import org.apache.cassandra.gms.VersionedValue; |
50 | 53 | import org.apache.cassandra.locator.InetAddressAndPort; |
51 | 54 | import org.apache.cassandra.net.MessagingService; |
| 55 | +import org.apache.cassandra.schema.DistributedSchema; |
52 | 56 | import org.apache.cassandra.schema.KeyspaceMetadata; |
| 57 | +import org.apache.cassandra.schema.Keyspaces; |
53 | 58 | import org.apache.cassandra.schema.SchemaConstants; |
| 59 | +import org.apache.cassandra.schema.SchemaKeyspace; |
54 | 60 | import org.apache.cassandra.schema.TableMetadata; |
55 | 61 | import org.apache.cassandra.service.StorageService; |
56 | 62 | import org.apache.cassandra.tcm.log.LocalLog; |
|
69 | 75 | import org.apache.cassandra.tcm.transformations.UnsafeJoin; |
70 | 76 | import org.apache.cassandra.tcm.transformations.cms.Initialize; |
71 | 77 | import org.apache.cassandra.utils.FBUtilities; |
| 78 | +import org.apache.cassandra.utils.Pair; |
72 | 79 |
|
73 | 80 | import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; |
74 | 81 | import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; |
@@ -254,12 +261,26 @@ public static void initializeForDiscovery(Runnable initMessaging) |
254 | 261 | Election.instance.migrated(); |
255 | 262 | } |
256 | 263 |
|
| 264 | + private static void updateSystemSchemaTables(Set<String> knownDatacenters) |
| 265 | + { |
| 266 | + List<Pair<KeyspaceMetadata, Long>> kss = DistributedSchema.distributedKeyspacesWithGeneration(knownDatacenters); |
| 267 | + List<Mutation> mutations = new ArrayList<>(); |
| 268 | + for (Pair<KeyspaceMetadata, Long> ksm : kss) |
| 269 | + { |
| 270 | + Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(Keyspaces.none(), Keyspaces.of(ksm.left)); |
| 271 | + mutations.addAll(SchemaKeyspace.convertSchemaDiffToMutations(ksDiff, ksm.right)); |
| 272 | + } |
| 273 | + SchemaKeyspace.applyChanges(mutations); |
| 274 | + } |
| 275 | + |
257 | 276 | /** |
258 | 277 | * This should only be called during startup. |
259 | 278 | */ |
260 | 279 | public static void initializeFromGossip(Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws StartupException |
261 | 280 | { |
262 | | - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters()); |
| 281 | + Set<String> knownDcs = SystemKeyspace.allKnownDatacenters(); |
| 282 | + updateSystemSchemaTables(knownDcs); |
| 283 | + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(knownDcs); |
263 | 284 | LocalLog.LogSpec logSpec = LocalLog.logSpec() |
264 | 285 | .withInitialState(emptyFromSystemTables) |
265 | 286 | .afterReplay(Startup::scrubDataDirectories, |
|
0 commit comments