Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.metadata.factory.init.type;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.database.connector.core.metadata.database.system.SystemDatabase;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceGeneratedDatabaseConfiguration;
Expand All @@ -28,13 +27,11 @@
import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.pool.config.DataSourceConfiguration;
import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabaseFactory;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabasesFactory;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilderParameter;
Expand All @@ -44,16 +41,12 @@
import org.apache.shardingsphere.mode.metadata.persist.config.global.PropertiesPersistService;
import org.apache.shardingsphere.mode.metadata.persist.version.VersionPersistService;
import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
import org.apache.shardingsphere.single.config.SingleRuleConfiguration;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -85,7 +78,7 @@ public MetaDataContexts create(final ContextManagerBuilderParameter param) throw
Map<String, DataSource> globalDataSources = param.getGlobalDataSources();
ConfigurationProperties props = new ConfigurationProperties(persistFacade.getPropsService().load());
DatabaseType protocolType = DatabaseTypeEngine.getProtocolType(effectiveDatabaseConfigs, props);
Map<String, Collection<ShardingSphereSchema>> schemas = loadSchemas(effectiveDatabaseConfigs, protocolType, props, isInstanceConnectionEnabled);
Map<String, Collection<ShardingSphereSchema>> schemas = loadSchemas(effectiveDatabaseConfigs.keySet(), protocolType);
Collection<ShardingSphereDatabase> databases;
if (persistSchemasEnabled) {
// TODO merge schemas with local
Expand Down Expand Up @@ -128,58 +121,14 @@ private void closeGeneratedDataSources(final String databaseName, final Map<Stri
}
}

private Map<String, Collection<ShardingSphereSchema>> loadSchemas(final Map<String, DatabaseConfiguration> effectiveDatabaseConfigs, final DatabaseType protocolType,
final ConfigurationProperties props, final boolean isInstanceConnectionEnabled) {
Collection<String> sysDatabaseNames = new SystemDatabase(protocolType).getSystemDatabases();
Collection<String> databaseNames = effectiveDatabaseConfigs.keySet();
private Map<String, Collection<ShardingSphereSchema>> loadSchemas(final Collection<String> databaseNames, final DatabaseType protocolType) {
Map<String, Collection<ShardingSphereSchema>> result = new HashMap<>(databaseNames.size());
for (Map.Entry<String, DatabaseConfiguration> entry : effectiveDatabaseConfigs.entrySet()) {
String dbName = entry.getKey();
for (String dbName : databaseNames) {
Collection<ShardingSphereSchema> schemas = persistFacade.getDatabaseMetaDataFacade().getSchema().load(dbName, protocolType);
if (sysDatabaseNames.contains(dbName)) {
if (null != schemas) {
result.put(dbName, schemas);
}
} else {
Collection<String> missedSingleTables = getMissedSingleTables(entry.getValue(), schemas, dbName);
if (missedSingleTables.isEmpty()) {
result.put(dbName, schemas);
} else {
log.info("Repository missed single tables: {} of database: {}, start to reload", missedSingleTables, dbName);
DataSourceGeneratedDatabaseConfiguration databaseConfig = new DataSourceGeneratedDatabaseConfiguration(persistFacade.loadDataSourceConfigurations(dbName),
Collections.singleton(new SingleRuleConfiguration(missedSingleTables, null)), isInstanceConnectionEnabled);
try {
ShardingSphereDatabase database = ShardingSphereDatabaseFactory.createWithoutSystemSchema(dbName, protocolType, databaseConfig, props, instanceContext);
database.getAllSchemas().forEach(schema -> persistFacade.getDatabaseMetaDataFacade().getTable().persist(dbName, schema.getName(), schema.getAllTables()));
result.put(dbName, persistFacade.getDatabaseMetaDataFacade().getSchema().load(dbName, protocolType));
} catch (final SQLException ex) {
result.put(dbName, schemas);
log.info("Reload reposotiry missed single tables: {} of database : {} failed", missedSingleTables, dbName, ex);
}
}
if (schemas != null) {
result.put(dbName, schemas);
}
}
return result;
}

private Collection<String> getMissedSingleTables(final DatabaseConfiguration databaseConfiguration, final Collection<ShardingSphereSchema> schemas,
final String dbName) {
Collection<String> result = new LinkedList<>();
Optional<SingleRuleConfiguration> singleRuleConfig = databaseConfiguration.getRuleConfigurations().stream().filter(each -> each instanceof SingleRuleConfiguration)
.map(each -> (SingleRuleConfiguration) each).findAny();
singleRuleConfig.ifPresent(singleRuleConfiguration -> singleRuleConfiguration.getTables().forEach(table -> {
DataNode dataNode = new DataNode(table);
String logicTableName = new DataNode(table).getTableName();
String schemaName = null != dataNode.getSchemaName() ? dataNode.getSchemaName() : dbName;
Optional<ShardingSphereSchema> schema = findSchema(schemas, schemaName);
if (!schema.isPresent() || !schema.get().containsTable(logicTableName)) {
result.add(table);
}
}));
return result;
}

private Optional<ShardingSphereSchema> findSchema(final Collection<ShardingSphereSchema> schemas, final String schemaName) {
return schemas.stream().filter(each -> each.getName().equals(schemaName)).findFirst();
}
}