diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index cbed394dfd..12bd2b75d6 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -459,6 +459,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType } else { qkind = types.QValueKindNumeric } + } else if strings.HasPrefix(column.DatabaseTypeName(), "JSON(") { + qkind = types.QValueKindJSON } else { return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName()) } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 59df573aa9..9b2a5bfd86 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -212,6 +212,10 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType) } + if clickHouseType == "JSON" && tableMapping != nil && len(tableMapping.Exclude) > 0 { + clickHouseType = buildJSONExclude(colName, tableMapping.Exclude) + } + fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(dstColName), clickHouseType) } @@ -292,6 +296,29 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( return result, nil } +// buildJSONExclude builds a JSON type with SKIP clauses for fields matching the +// column prefix. e.g., for colName="doc" and excludeFields=["doc.password", +// "doc.user.ssn", "other_col"], it returns JSON(SKIP password, SKIP `user.ssn`) +func buildJSONExclude(colName string, excludeFields []string) string { + prefix := colName + "." + var parts []string + + for _, field := range excludeFields { + field = strings.TrimSpace(field) + if strings.HasPrefix(field, prefix) { + jsonField := strings.TrimPrefix(field, prefix) + if jsonField != "" { + parts = append(parts, "SKIP "+peerdb_clickhouse.QuoteIdentifier(jsonField)) + } + } + } + + if len(parts) == 0 { + return "JSON" + } + return fmt.Sprintf("JSON(%s)", strings.Join(parts, ",")) +} + // getOrderedOrderByColumns returns columns to be used for ordering in destination table operations. // If no custom ordering is specified, return the source table's primary key columns as ordering keys. // If custom ordering columns are specified, return those columns sorted by their ordering value. diff --git a/flow/e2e/mongo_test.go b/flow/e2e/mongo_test.go index 2cdac171a9..2346c5e2f7 100644 --- a/flow/e2e/mongo_test.go +++ b/flow/e2e/mongo_test.go @@ -899,3 +899,95 @@ func (s MongoClickhouseSuite) Test_Json_Types() { env.Cancel(t.Context()) RequireEnvCanceled(t, env) } + +func (s MongoClickhouseSuite) Test_JSON_Skip_Fields() { + t := s.T() + srcDatabase := GetTestDatabase(s.Suffix()) + srcTable := "test_json_skip" + dstTable := "test_json_skip_dst" + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: AddSuffix(s, srcTable), + TableMappings: TableMappings(s, srcTable, dstTable), + Destination: s.Peer().Name, + } + flowConnConfig := s.generateFlowConnectionConfigsDefaultEnv(connectionGen) + flowConnConfig.DoInitialSnapshot = true + // Skip: top-level field, nested field, and nested object + flowConnConfig.TableMappings[0].Exclude = []string{ + "doc.ssn", // exclude top-level field + "doc.address.street", // exclude nested field + "doc.payment_info", // exclude nested object + } + + adminClient := s.Source().(*MongoSource).AdminClient() + collection := adminClient.Database(srcDatabase).Collection(srcTable) + + doc := bson.D{ + {Key: "name", Value: "user1"}, + {Key: "ssn", Value: "123-45-6789"}, + {Key: "address", Value: bson.D{ + {Key: "street", Value: "123 Lombard St"}, + {Key: "city", Value: "San Francisco"}, + {Key: "zip", Value: "94133"}, + }}, + {Key: "payment_info", Value: bson.D{ + {Key: "card_number", Value: "4111111111111111"}, + {Key: "cvv", Value: "123"}, + {Key: "expiry", Value: "12/25"}, + }}, + } + res, err := collection.InsertOne(t.Context(), doc, options.InsertOne()) + require.NoError(t, err) + require.True(t, res.Acknowledged) + + tc := NewTemporalClient(t) + env := ExecutePeerflow(t, tc, flowConnConfig) + EnvWaitForCount(env, s, "initial load", dstTable, "_id,doc", 1) + + peer := s.Peer() + ch, err := connclickhouse.Connect(t.Context(), nil, peer.GetClickhouseConfig()) + require.NoError(t, err) + defer ch.Close() + + // verify schema + var columnType string + row := ch.QueryRow(t.Context(), + fmt.Sprintf("SELECT type FROM system.columns WHERE database = '%s' AND table = '%s' AND name = 'doc'", + peer.GetClickhouseConfig().Database, dstTable)) + require.NoError(t, row.Err()) + require.NoError(t, row.Scan(&columnType)) + require.Equal(t, "JSON(SKIP `address.street`, SKIP payment_info, SKIP ssn)", columnType) + + // verify initial load data + var docStr string + err = ch.QueryRow(t.Context(), + fmt.Sprintf("SELECT toString(doc) FROM %s LIMIT 1", dstTable)).Scan(&docStr) + require.NoError(t, err) + require.Contains(t, docStr, "name", "name field should exist in JSON") + require.Contains(t, docStr, "zip", "zip field should exist in JSON") + require.NotContains(t, docStr, "ssn", "ssn field should not exist in JSON") + require.NotContains(t, docStr, "street", "street field should not exist in JSON") + require.NotContains(t, docStr, "payment_info", "payment_info field should not exist in JSON") + require.NotContains(t, docStr, "card_number", "card_number field should not exist in JSON") + + SetupCDCFlowStatusQuery(t, env, flowConnConfig) + res, err = collection.InsertOne(t.Context(), doc, options.InsertOne()) + require.NoError(t, err) + require.True(t, res.Acknowledged) + EnvWaitForCount(env, s, "cdc", dstTable, "_id,doc", 2) + + // verify cdc data + err = ch.QueryRow(t.Context(), + fmt.Sprintf("SELECT toString(doc) FROM %s ORDER BY _peerdb_synced_at DESC LIMIT 1", dstTable)).Scan(&docStr) + require.NoError(t, err) + require.Contains(t, docStr, "name", "name field should exist after CDC") + require.Contains(t, docStr, "zip", "zip field should exist after CDC") + require.NotContains(t, docStr, "ssn", "ssn field should not exist after CDC") + require.NotContains(t, docStr, "street", "street field should not exist after CDC") + require.NotContains(t, docStr, "payment_info", "payment_info field should not exist after CDC") + require.NotContains(t, docStr, "card_number", "card_number field should not exist after CDC") + + env.Cancel(t.Context()) + RequireEnvCanceled(t, env) +} diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 63390559cb..293d587da4 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -306,6 +306,7 @@ export default function CDCConfigForm({ rows={rows} setRows={setRows} peerType={destinationType} + sourcePeerType={sourceType} alreadySelectedTablesMapping={new Map()} initialLoadOnly={mirrorConfig.initialSnapshotOnly} /> diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 881c9f65ea..4d452417b7 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -31,6 +31,7 @@ import { fetchColumns, fetchTables } from '../handlers'; import ColumnBox from './columnbox'; import CustomColumnType from './customColumnType'; import SchemaSettings from './schemasettings'; +import SkipJsonFields from './skipJsonFields'; import SelectSortingKeys from './sortingkey'; import { columnBoxDividerStyle, @@ -50,6 +51,7 @@ interface SchemaBoxProps { SetStateAction<{ tableName: string; columns: ColumnsItem[] }[]> >; peerType?: DBType; + sourcePeerType?: DBType; alreadySelectedTables: TableMapping[] | undefined; initialLoadOnly?: boolean; } @@ -57,6 +59,7 @@ interface SchemaBoxProps { export default function SchemaBox({ sourcePeer, peerType, + sourcePeerType, schema, rows, setRows, @@ -565,6 +568,13 @@ export default function SchemaBox({ setRows={setRows} peerType={peerType} /> + {sourcePeerType?.toString() === + DBType[DBType.MONGO].toString() && ( + + )} )} diff --git a/ui/app/mirrors/create/cdc/skipJsonFields.tsx b/ui/app/mirrors/create/cdc/skipJsonFields.tsx new file mode 100644 index 0000000000..6db517c5e7 --- /dev/null +++ b/ui/app/mirrors/create/cdc/skipJsonFields.tsx @@ -0,0 +1,117 @@ +'use client'; +import { + Dispatch, + SetStateAction, + useCallback, + useMemo, + useState, +} from 'react'; + +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { Checkbox } from '@/lib/Checkbox'; +import { Label } from '@/lib/Label'; +import { RowWithCheckbox } from '@/lib/Layout'; +import { TextField } from '@/lib/TextField'; + +interface SkipJsonFieldsProps { + tableRow: TableMapRow; + setRows: Dispatch>; +} + +const MONGODB_DOC_COLUMN_PREFIX = 'doc.'; + +export default function SkipJsonFields({ + tableRow, + setRows, +}: SkipJsonFieldsProps) { + const existingExclude = useMemo(() => { + return Array.from(tableRow.exclude) + .filter((field) => field.startsWith(MONGODB_DOC_COLUMN_PREFIX)) + .map((field) => field.slice(MONGODB_DOC_COLUMN_PREFIX.length)) + .join(','); + }, [tableRow.exclude]); + + const [showExclude, setShowExclude] = useState( + Array.from(tableRow.exclude).some((f) => + f.startsWith(MONGODB_DOC_COLUMN_PREFIX) + ) + ); + const [excludeValue, setExcludeValue] = useState(existingExclude); + + const updateExclude = useCallback( + (value: string) => { + const fields = value + .split(',') + .map((f) => f.trim()) + .filter((f) => f !== '') + .map((f) => MONGODB_DOC_COLUMN_PREFIX + f); + + setRows((prev) => + prev.map((row) => { + if (row.source !== tableRow.source) return row; + return { + ...row, + exclude: new Set(fields), + }; + }) + ); + }, + [tableRow.source, setRows] + ); + + const handleToggle = useCallback( + (state: boolean) => { + setShowExclude(state); + if (!state) { + setExcludeValue(''); + updateExclude(''); + } + }, + [updateExclude] + ); + + const handleChange = useCallback( + (value: string) => { + setExcludeValue(value); + updateExclude(value); + }, + [updateExclude] + ); + + return ( +
+
+ + Hide sensitive fields + + } + action={ + + } + /> +
+ {showExclude && ( + ) => + handleChange(e.target.value) + } + /> + )} +
+ ); +} diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx index e0c21b6fd6..bff32a2520 100644 --- a/ui/app/mirrors/create/cdc/tablemapping.tsx +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -18,6 +18,7 @@ interface TableMappingProps { rows: TableMapRow[]; setRows: Dispatch>; peerType?: DBType; + sourcePeerType?: DBType; // schema -> omitted source table mapping alreadySelectedTablesMapping: Map; initialLoadOnly: boolean; @@ -28,6 +29,7 @@ export default function TablePicker({ rows, setRows, peerType, + sourcePeerType, alreadySelectedTablesMapping, initialLoadOnly, }: TableMappingProps) { @@ -99,6 +101,7 @@ export default function TablePicker({ tableColumns={tableColumns} setTableColumns={setTableColumns} peerType={peerType} + sourcePeerType={sourcePeerType} alreadySelectedTables={alreadySelectedTablesMapping.get(schema)} initialLoadOnly={initialLoadOnly} />