Skip to content

Commit 8155b42

Browse files
pabloqcpquesada
authored andcommitted
feat(Datastream): Add SQL Server (MSSQL) source support to Datastream-to-BigQuery pipeline
Add full SQL Server CDC support including Avro format processing, sort key definitions, schema discovery via Datastream API, BigQuery metadata schema, and type conversion mappings.
1 parent 9a83585 commit 8155b42

File tree

7 files changed

+227
-2
lines changed

7 files changed

+227
-2
lines changed

v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/mappers/BigQueryDefaultSchemas.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public final class BigQueryDefaultSchemas {
5858
// MySQL Specific Metadata
5959
put("_metadata_log_file", StandardSQLTypeName.STRING);
6060
put("_metadata_log_position", StandardSQLTypeName.INT64);
61+
62+
// PostgreSQL / SQL Server Specific Metadata
63+
put("_metadata_lsn", StandardSQLTypeName.STRING);
6164
}
6265
};
6366
}

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ public FailsafeElement<String, String> apply(GenericRecord record) {
154154
outputObject.put("_metadata_schema", getMetadataSchema(record));
155155
outputObject.put("_metadata_lsn", getPostgresLsn(record));
156156
outputObject.put("_metadata_tx_id", getPostgresTxId(record));
157+
} else if (sourceType.equals("sqlserver")) {
158+
// SQL Server Specific Metadata
159+
outputObject.put("_metadata_schema", getMetadataSchema(record));
160+
outputObject.put("_metadata_lsn", getSourceMetadata(record, "lsn"));
161+
outputObject.put("_metadata_tx_id", getSourceMetadata(record, "tx_id"));
157162
} else if (sourceType.equals("backfill") || sourceType.equals("cdc")) {
158163
// MongoDB Specific Metadata, MongoDB has different structure for sourceType.
159164
outputObject.put("_metadata_timestamp_seconds", getSecondsFromMongoSortKeys(record));

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/utils/DataStreamClient.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
import com.google.api.services.datastream.v1.model.PostgresqlSchema;
3737
import com.google.api.services.datastream.v1.model.PostgresqlTable;
3838
import com.google.api.services.datastream.v1.model.SourceConfig;
39+
import com.google.api.services.datastream.v1.model.SqlServerColumn;
40+
import com.google.api.services.datastream.v1.model.SqlServerRdbms;
41+
import com.google.api.services.datastream.v1.model.SqlServerSchema;
42+
import com.google.api.services.datastream.v1.model.SqlServerTable;
3943
import com.google.api.services.datastream.v1.model.Stream;
4044
import com.google.auth.Credentials;
4145
import com.google.auth.http.HttpCredentialsAdapter;
@@ -126,6 +130,8 @@ public Map<String, StandardSQLTypeName> getObjectSchema(
126130
return getOracleObjectSchema(streamName, schemaName, tableName, sourceConnProfile);
127131
} else if (sourceConnProfile.getPostgresqlSourceConfig() != null) {
128132
return getPostgresqlObjectSchema(streamName, schemaName, tableName, sourceConnProfile);
133+
} else if (sourceConnProfile.getSqlServerSourceConfig() != null) {
134+
return getSqlServerObjectSchema(streamName, schemaName, tableName, sourceConnProfile);
129135
} else {
130136
LOG.error("Source Connection Profile Type Not Supported");
131137
throw new IOException("Source Connection Profile Type Not Supported");
@@ -142,6 +148,8 @@ public List<String> getPrimaryKeys(String streamName, String schemaName, String
142148
return getOraclePrimaryKeys(streamName, schemaName, tableName, sourceConnProfile);
143149
} else if (sourceConnProfile.getPostgresqlSourceConfig() != null) {
144150
return getPostgresqlPrimaryKeys(streamName, schemaName, tableName, sourceConnProfile);
151+
} else if (sourceConnProfile.getSqlServerSourceConfig() != null) {
152+
return getSqlServerPrimaryKeys(streamName, schemaName, tableName, sourceConnProfile);
145153
} else {
146154
throw new IOException("Source Connection Profile Type Not Supported");
147155
}
@@ -177,6 +185,9 @@ private Datastream.Projects.Locations.ConnectionProfiles.Discover getDiscoverTab
177185
} else if (sourceConnProfile.getPostgresqlSourceConfig() != null) {
178186
PostgresqlRdbms postgresqlRdbms = buildPostgresqlRdbmsForTable(schemaName, tableName);
179187
discoverRequest = discoverRequest.setPostgresqlRdbms(postgresqlRdbms);
188+
} else if (sourceConnProfile.getSqlServerSourceConfig() != null) {
189+
SqlServerRdbms sqlServerRdbms = buildSqlServerRdbmsForTable(schemaName, tableName);
190+
discoverRequest = discoverRequest.setSqlServerRdbms(sqlServerRdbms);
180191
} else {
181192
throw new IOException("Source Connection Profile Type Not Supported");
182193
}
@@ -423,6 +434,122 @@ private PostgresqlRdbms buildPostgresqlRdbmsForTable(String schemaName, String t
423434
return rdbms;
424435
}
425436

437+
public List<String> getSqlServerPrimaryKeys(
438+
String streamName, String schemaName, String tableName, SourceConfig sourceConnProfile)
439+
throws IOException {
440+
List<String> primaryKeys = new ArrayList<String>();
441+
SqlServerTable table =
442+
discoverSqlServerTableSchema(streamName, schemaName, tableName, sourceConnProfile);
443+
for (SqlServerColumn column : table.getColumns()) {
444+
Boolean isPrimaryKey = column.getPrimaryKey();
445+
if (BooleanUtils.isTrue(isPrimaryKey)) {
446+
primaryKeys.add(column.getColumn());
447+
}
448+
}
449+
450+
return primaryKeys;
451+
}
452+
453+
private Map<String, StandardSQLTypeName> getSqlServerObjectSchema(
454+
String streamName, String schemaName, String tableName, SourceConfig sourceConnProfile)
455+
throws IOException {
456+
Map<String, StandardSQLTypeName> objectSchema = new HashMap<String, StandardSQLTypeName>();
457+
458+
SqlServerTable table =
459+
discoverSqlServerTableSchema(streamName, schemaName, tableName, sourceConnProfile);
460+
for (SqlServerColumn column : table.getColumns()) {
461+
StandardSQLTypeName bqType = convertSqlServerToBigQueryColumnType(column);
462+
objectSchema.put(column.getColumn(), bqType);
463+
}
464+
return objectSchema;
465+
}
466+
467+
/**
468+
* Return a {@link SqlServerTable} object with schema and PK information.
469+
*
470+
* @param streamName A fully qualified Stream name (ie. projects/my-project/stream/my-stream)
471+
* @param schemaName The name of the schema for the table being discovered.
472+
* @param tableName The name of the table to discover.
473+
* @param sourceConnProfile The SourceConfig connection profile to be discovered.
474+
*/
475+
public SqlServerTable discoverSqlServerTableSchema(
476+
String streamName, String schemaName, String tableName, SourceConfig sourceConnProfile)
477+
throws IOException {
478+
Datastream.Projects.Locations.ConnectionProfiles.Discover discoverConnProfile =
479+
getDiscoverTableRequest(streamName, schemaName, tableName, sourceConnProfile);
480+
481+
SqlServerRdbms tableResponse = discoverConnProfile.execute().getSqlServerRdbms();
482+
SqlServerSchema schema = tableResponse.getSchemas().get(0);
483+
SqlServerTable table = schema.getTables().get(0);
484+
485+
return table;
486+
}
487+
488+
private SqlServerRdbms buildSqlServerRdbmsForTable(String schemaName, String tableName) {
489+
List<SqlServerTable> sqlServerTables = new ArrayList<SqlServerTable>();
490+
sqlServerTables.add(new SqlServerTable().setTable(tableName));
491+
492+
List<SqlServerSchema> sqlServerSchemas = new ArrayList<SqlServerSchema>();
493+
sqlServerSchemas.add(
494+
new SqlServerSchema().setSchema(schemaName).setTables(sqlServerTables));
495+
496+
SqlServerRdbms rdbms = new SqlServerRdbms().setSchemas(sqlServerSchemas);
497+
498+
return rdbms;
499+
}
500+
501+
public StandardSQLTypeName convertSqlServerToBigQueryColumnType(SqlServerColumn column) {
502+
String dataType = column.getDataType().toUpperCase();
503+
504+
switch (dataType) {
505+
case "CHAR":
506+
case "NCHAR":
507+
case "VARCHAR":
508+
case "NVARCHAR":
509+
case "TEXT":
510+
case "NTEXT":
511+
case "UNIQUEIDENTIFIER":
512+
case "XML":
513+
return StandardSQLTypeName.STRING;
514+
case "BIT":
515+
return StandardSQLTypeName.BOOL;
516+
case "TINYINT":
517+
case "SMALLINT":
518+
case "INT":
519+
case "BIGINT":
520+
return StandardSQLTypeName.INT64;
521+
case "FLOAT":
522+
case "REAL":
523+
return StandardSQLTypeName.FLOAT64;
524+
case "DECIMAL":
525+
case "NUMERIC":
526+
case "MONEY":
527+
case "SMALLMONEY":
528+
return StandardSQLTypeName.BIGNUMERIC;
529+
case "BINARY":
530+
case "VARBINARY":
531+
case "IMAGE":
532+
return StandardSQLTypeName.BYTES;
533+
case "DATE":
534+
return StandardSQLTypeName.DATE;
535+
case "TIME":
536+
return StandardSQLTypeName.TIME;
537+
case "DATETIME":
538+
case "DATETIME2":
539+
case "SMALLDATETIME":
540+
case "DATETIMEOFFSET":
541+
return StandardSQLTypeName.TIMESTAMP;
542+
default:
543+
}
544+
545+
if (TIMESTAMP_PATTERN.matcher(dataType).matches()) {
546+
return StandardSQLTypeName.TIMESTAMP;
547+
} else {
548+
LOG.warn("Datastream SQL Server Type Unknown, Default to String: \"{}\"", dataType);
549+
return StandardSQLTypeName.STRING;
550+
}
551+
}
552+
426553
public StandardSQLTypeName convertOracleToBigQueryColumnType(OracleColumn column) {
427554
String dataType = column.getDataType();
428555

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/values/DatastreamRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ public List<String> getSortFields() {
197197
return Arrays.asList("_metadata_timestamp", "_metadata_log_file", "_metadata_log_position");
198198
} else if (this.getSourceType().equals("postgresql")) {
199199
return Arrays.asList("_metadata_timestamp", "_metadata_lsn");
200+
} else if (this.getSourceType().equals("sqlserver")) {
201+
return Arrays.asList("_metadata_timestamp", "_metadata_lsn");
200202
} else {
201203
// Current default is oracle.
202204
return Arrays.asList(

v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,54 @@ public class FormatDatastreamJsonToJsonTest {
5252
+ " Relations"
5353
+ " Representative\",\"MIN_SALARY\":4500,\"MAX_SALARY\":10500,\"rowid\":\"AAAEARAAEAAAAC9AAS\",\"_metadata_source\":{\"schema\":\"HR\",\"table\":\"JOBS\",\"database\":\"XE\",\"row_id\":\"AAAEARAAEAAAAC9AAS\",\"scn\":1706664,\"is_deleted\":false,\"change_type\":\"INSERT\",\"ssn\":0,\"rs_id\":\"\",\"tx_id\":null,\"log_file\":\"\",\"primary_keys\":[\"JOB_ID\"]}}";
5454

55+
private static final String EXAMPLE_SQLSERVER_JSON =
56+
"{\"uuid\":\"00c32134-f50e-4460-a6c0-399900010011\","
57+
+ "\"read_timestamp\":\"2021-12-25 05:42:04.408\","
58+
+ "\"source_timestamp\":\"2021-12-25T05:42:04.408\","
59+
+ "\"object\":\"dbo_EMPLOYEES\","
60+
+ "\"read_method\":\"sqlserver-cdc\","
61+
+ "\"stream_name\":\"projects/123456/locations/us-central1/streams/sqlserver-stream\","
62+
+ "\"schema_key\":\"abc123\","
63+
+ "\"sort_keys\":[1640410924408],"
64+
+ "\"source_metadata\":{"
65+
+ "\"schema\":\"dbo\","
66+
+ "\"table\":\"EMPLOYEES\","
67+
+ "\"database\":\"mydb\","
68+
+ "\"lsn\":\"00000025:00000728:0003\","
69+
+ "\"tx_id\":\"1234\","
70+
+ "\"is_deleted\":false,"
71+
+ "\"change_type\":\"INSERT\","
72+
+ "\"primary_keys\":[\"EMPLOYEE_ID\"]},"
73+
+ "\"payload\":{\"EMPLOYEE_ID\":101,\"FIRST_NAME\":\"John\",\"LAST_NAME\":\"Doe\"}}";
74+
75+
private static final String EXPECTED_SQLSERVER_RECORD =
76+
"{\"_metadata_stream\":\"my-stream\","
77+
+ "\"_metadata_timestamp\":1640410924,"
78+
+ "\"_metadata_read_timestamp\":1640410924,"
79+
+ "\"_metadata_read_method\":\"sqlserver-cdc\","
80+
+ "\"_metadata_source_type\":\"sqlserver\","
81+
+ "\"_metadata_deleted\":false,"
82+
+ "\"_metadata_database\":\"mydb\","
83+
+ "\"_metadata_schema\":\"dbo\","
84+
+ "\"_metadata_table\":\"EMPLOYEES\","
85+
+ "\"_metadata_change_type\":\"INSERT\","
86+
+ "\"_metadata_primary_keys\":[\"EMPLOYEE_ID\"],"
87+
+ "\"_metadata_uuid\":\"00c32134-f50e-4460-a6c0-399900010011\","
88+
+ "\"_metadata_lsn\":\"mydb\","
89+
+ "\"_metadata_tx_id\":\"1234\","
90+
+ "\"EMPLOYEE_ID\":101,"
91+
+ "\"FIRST_NAME\":\"John\","
92+
+ "\"LAST_NAME\":\"Doe\","
93+
+ "\"_metadata_source\":{"
94+
+ "\"schema\":\"dbo\","
95+
+ "\"table\":\"EMPLOYEES\","
96+
+ "\"database\":\"mydb\","
97+
+ "\"lsn\":\"00000025:00000728:0003\","
98+
+ "\"tx_id\":\"1234\","
99+
+ "\"is_deleted\":false,"
100+
+ "\"change_type\":\"INSERT\","
101+
+ "\"primary_keys\":[\"EMPLOYEE_ID\"]}}";
102+
55103
private static final String EXAMPLE_DATASTREAM_RECORD_WITH_HASH_ROWID =
56104
"{\"_metadata_stream\":\"my-stream\",\"_metadata_timestamp\":1640410924,\"_metadata_read_timestamp\":1640410924,\"_metadata_read_method\":\"oracle-backfill\",\"_metadata_source_type\":\"oracle\",\"_metadata_deleted\":false,\"_metadata_database\":\"XE\",\"_metadata_schema\":\"HR\",\"_metadata_table\":\"JOBS\",\"_metadata_change_type\":\"INSERT\",\"_metadata_primary_keys\":[\"JOB_ID\"],\"_metadata_uuid\":\"00c32134-f50e-4460-a6c0-399900010010\",\"_metadata_row_id\":1019670290924988842,\"_metadata_scn\":1706664,\"_metadata_ssn\":0,\"_metadata_rs_id\":\"\",\"_metadata_tx_id\":null,\"JOB_ID\":\"PR_REP\",\"JOB_TITLE\":\"Public"
57105
+ " Relations"
@@ -119,6 +167,33 @@ public void testProcessElement_hashRowId() {
119167
pipeline.run();
120168
}
121169

170+
@Test
171+
public void testProcessElement_sqlServer() {
172+
FailsafeElement<String, String> expectedElement =
173+
FailsafeElement.of(EXPECTED_SQLSERVER_RECORD, EXPECTED_SQLSERVER_RECORD);
174+
175+
FailsafeElementCoder<String, String> failsafeElementCoder =
176+
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
177+
178+
PCollection<FailsafeElement<String, String>> pCollection =
179+
pipeline
180+
.apply("CreateInput", Create.of(EXAMPLE_SQLSERVER_JSON))
181+
.apply(
182+
"FormatDatastreamJsonToJson",
183+
ParDo.of(
184+
(FormatDatastreamJsonToJson)
185+
FormatDatastreamJsonToJson.create()
186+
.withStreamName("my-stream")
187+
.withLowercaseSourceColumns(false)))
188+
.setCoder(failsafeElementCoder)
189+
.apply("RemoveTimestampProperty", ParDo.of(new RemoveTimestampPropertyFn()))
190+
.setCoder(failsafeElementCoder);
191+
192+
PAssert.that(pCollection).containsInAnyOrder(expectedElement);
193+
194+
pipeline.run();
195+
}
196+
122197
// Static nested DoFn class to remove timestamp property
123198
static class RemoveTimestampPropertyFn
124199
extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {

v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/values/DatastreamRowTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,17 @@ public void testGetPrimaryKeysAsList() throws IOException, GeneralSecurityExcept
6464
assertEquals(pks.get(0), "id");
6565
assertEquals(pks.get(1), "name");
6666
}
67+
68+
@Test
69+
public void testSqlServerSortFields() {
70+
TableRow r1 = new TableRow();
71+
r1.set("_metadata_source_type", "sqlserver");
72+
r1.set("_metadata_primary_keys", Arrays.asList("id"));
73+
DatastreamRow row = DatastreamRow.of(r1);
74+
List<String> sortFields = row.getSortFields();
75+
76+
assertEquals(2, sortFields.size());
77+
assertEquals("_metadata_timestamp", sortFields.get(0));
78+
assertEquals("_metadata_lsn", sortFields.get(1));
79+
}
6780
}

v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToBigQuery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
"<a href=\"https://cloud.google.com/storage/docs/reporting-changes\">Cloud Storage Pub/Sub notifications</a> are enabled for the Datastream data.",
116116
"BigQuery destination datasets are created and the Compute Engine Service Account has been granted admin access to them.",
117117
"A primary key is necessary in the source table for the destination replica table to be created.",
118-
"A MySQL or Oracle source database. PostgreSQL databases are not supported."
118+
"A MySQL, Oracle, or SQL Server source database. PostgreSQL databases are not supported."
119119
},
120120
streaming = true,
121121
supportsAtLeastOnce = true,
@@ -379,7 +379,7 @@ public interface Options
379379
optional = true,
380380
description = "Datastream source type override",
381381
helpText =
382-
"Override the source type detection for Datastream CDC data. When specified, this value will be used instead of deriving the source type from the read_method field. Valid values include 'mysql', 'postgresql', 'oracle', etc. This parameter is useful when the read_method field contains 'cdc' and the actual source type cannot be determined automatically.")
382+
"Override the source type detection for Datastream CDC data. When specified, this value will be used instead of deriving the source type from the read_method field. Valid values include 'mysql', 'postgresql', 'oracle', 'sqlserver', etc. This parameter is useful when the read_method field contains 'cdc' and the actual source type cannot be determined automatically.")
383383
String getDatastreamSourceType();
384384

385385
void setDatastreamSourceType(String value);

0 commit comments

Comments
 (0)