Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
} else {
qkind = types.QValueKindNumeric
}
} else if strings.HasPrefix(column.DatabaseTypeName(), "JSON(") {
qkind = types.QValueKindJSON
} else {
return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName())
}
Expand Down
27 changes: 27 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType)
}

if clickHouseType == "JSON" && tableMapping != nil && len(tableMapping.Exclude) > 0 {
clickHouseType = buildJSONExclude(colName, tableMapping.Exclude)
}

fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(dstColName), clickHouseType)
}

Expand Down Expand Up @@ -292,6 +296,29 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
return result, nil
}

// buildJSONExclude builds a JSON type with SKIP clauses for fields matching the
// column prefix. e.g., for colName="doc" and excludeFields=["doc.password",
// "doc.user.ssn", "other_col"], it returns JSON(SKIP password, SKIP `user.ssn`)
func buildJSONExclude(colName string, excludeFields []string) string {
prefix := colName + "."
var parts []string

for _, field := range excludeFields {
field = strings.TrimSpace(field)
if strings.HasPrefix(field, prefix) {
jsonField := strings.TrimPrefix(field, prefix)
if jsonField != "" {
parts = append(parts, "SKIP "+peerdb_clickhouse.QuoteIdentifier(jsonField))
}
}
}

if len(parts) == 0 {
return "JSON"
}
return fmt.Sprintf("JSON(%s)", strings.Join(parts, ","))
}

// getOrderedOrderByColumns returns columns to be used for ordering in destination table operations.
// If no custom ordering is specified, return the source table's primary key columns as ordering keys.
// If custom ordering columns are specified, return those columns sorted by their ordering value.
Expand Down
92 changes: 92 additions & 0 deletions flow/e2e/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,3 +899,95 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
env.Cancel(t.Context())
RequireEnvCanceled(t, env)
}

func (s MongoClickhouseSuite) Test_JSON_Skip_Fields() {
t := s.T()
srcDatabase := GetTestDatabase(s.Suffix())
srcTable := "test_json_skip"
dstTable := "test_json_skip_dst"

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: AddSuffix(s, srcTable),
TableMappings: TableMappings(s, srcTable, dstTable),
Destination: s.Peer().Name,
}
flowConnConfig := s.generateFlowConnectionConfigsDefaultEnv(connectionGen)
flowConnConfig.DoInitialSnapshot = true
// Skip: top-level field, nested field, and nested object
flowConnConfig.TableMappings[0].Exclude = []string{
"doc.ssn", // exclude top-level field
"doc.address.street", // exclude nested field
"doc.payment_info", // exclude nested object
}

adminClient := s.Source().(*MongoSource).AdminClient()
collection := adminClient.Database(srcDatabase).Collection(srcTable)

doc := bson.D{
{Key: "name", Value: "user1"},
{Key: "ssn", Value: "123-45-6789"},
{Key: "address", Value: bson.D{
{Key: "street", Value: "123 Lombard St"},
{Key: "city", Value: "San Francisco"},
{Key: "zip", Value: "94133"},
}},
{Key: "payment_info", Value: bson.D{
{Key: "card_number", Value: "4111111111111111"},
{Key: "cvv", Value: "123"},
{Key: "expiry", Value: "12/25"},
}},
}
res, err := collection.InsertOne(t.Context(), doc, options.InsertOne())
require.NoError(t, err)
require.True(t, res.Acknowledged)

tc := NewTemporalClient(t)
env := ExecutePeerflow(t, tc, flowConnConfig)
EnvWaitForCount(env, s, "initial load", dstTable, "_id,doc", 1)

peer := s.Peer()
ch, err := connclickhouse.Connect(t.Context(), nil, peer.GetClickhouseConfig())
require.NoError(t, err)
defer ch.Close()

// verify schema
var columnType string
row := ch.QueryRow(t.Context(),
fmt.Sprintf("SELECT type FROM system.columns WHERE database = '%s' AND table = '%s' AND name = 'doc'",
peer.GetClickhouseConfig().Database, dstTable))
require.NoError(t, row.Err())
require.NoError(t, row.Scan(&columnType))
require.Equal(t, "JSON(SKIP `address.street`, SKIP payment_info, SKIP ssn)", columnType)

// verify initial load data
var docStr string
err = ch.QueryRow(t.Context(),
fmt.Sprintf("SELECT toString(doc) FROM %s LIMIT 1", dstTable)).Scan(&docStr)
require.NoError(t, err)
require.Contains(t, docStr, "name", "name field should exist in JSON")
require.Contains(t, docStr, "zip", "zip field should exist in JSON")
require.NotContains(t, docStr, "ssn", "ssn field should not exist in JSON")
require.NotContains(t, docStr, "street", "street field should not exist in JSON")
require.NotContains(t, docStr, "payment_info", "payment_info field should not exist in JSON")
require.NotContains(t, docStr, "card_number", "card_number field should not exist in JSON")

SetupCDCFlowStatusQuery(t, env, flowConnConfig)
res, err = collection.InsertOne(t.Context(), doc, options.InsertOne())
require.NoError(t, err)
require.True(t, res.Acknowledged)
EnvWaitForCount(env, s, "cdc", dstTable, "_id,doc", 2)

// verify cdc data
err = ch.QueryRow(t.Context(),
fmt.Sprintf("SELECT toString(doc) FROM %s ORDER BY _peerdb_synced_at DESC LIMIT 1", dstTable)).Scan(&docStr)
require.NoError(t, err)
require.Contains(t, docStr, "name", "name field should exist after CDC")
require.Contains(t, docStr, "zip", "zip field should exist after CDC")
require.NotContains(t, docStr, "ssn", "ssn field should not exist after CDC")
require.NotContains(t, docStr, "street", "street field should not exist after CDC")
require.NotContains(t, docStr, "payment_info", "payment_info field should not exist after CDC")
require.NotContains(t, docStr, "card_number", "card_number field should not exist after CDC")

env.Cancel(t.Context())
RequireEnvCanceled(t, env)
}
1 change: 1 addition & 0 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ export default function CDCConfigForm({
rows={rows}
setRows={setRows}
peerType={destinationType}
sourcePeerType={sourceType}
alreadySelectedTablesMapping={new Map<string, TableMapping[]>()}
initialLoadOnly={mirrorConfig.initialSnapshotOnly}
/>
Expand Down
10 changes: 10 additions & 0 deletions ui/app/mirrors/create/cdc/schemabox.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { fetchColumns, fetchTables } from '../handlers';
import ColumnBox from './columnbox';
import CustomColumnType from './customColumnType';
import SchemaSettings from './schemasettings';
import SkipJsonFields from './skipJsonFields';
import SelectSortingKeys from './sortingkey';
import {
columnBoxDividerStyle,
Expand All @@ -50,13 +51,15 @@ interface SchemaBoxProps {
SetStateAction<{ tableName: string; columns: ColumnsItem[] }[]>
>;
peerType?: DBType;
sourcePeerType?: DBType;
alreadySelectedTables: TableMapping[] | undefined;
initialLoadOnly?: boolean;
}

export default function SchemaBox({
sourcePeer,
peerType,
sourcePeerType,
schema,
rows,
setRows,
Expand Down Expand Up @@ -565,6 +568,13 @@ export default function SchemaBox({
setRows={setRows}
peerType={peerType}
/>
{sourcePeerType?.toString() ===
DBType[DBType.MONGO].toString() && (
<SkipJsonFields
tableRow={row}
setRows={setRows}
/>
)}
</div>
)}
</>
Expand Down
117 changes: 117 additions & 0 deletions ui/app/mirrors/create/cdc/skipJsonFields.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
'use client';
import {
Dispatch,
SetStateAction,
useCallback,
useMemo,
useState,
} from 'react';

import { TableMapRow } from '@/app/dto/MirrorsDTO';
import { Checkbox } from '@/lib/Checkbox';
import { Label } from '@/lib/Label';
import { RowWithCheckbox } from '@/lib/Layout';
import { TextField } from '@/lib/TextField';

interface SkipJsonFieldsProps {
tableRow: TableMapRow;
setRows: Dispatch<SetStateAction<TableMapRow[]>>;
}

const MONGODB_DOC_COLUMN_PREFIX = 'doc.';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other source types excluded columns are defined to be on the source side. Would it be difficult to do the same here and add doc. (or another flattened column) in Go code?
Also, seems like the ssn and such would still end up in the raw table, if we expose it in e2e clickpipes, wouldn't the expectation be that it never lands in CH in the open?

Copy link
Contributor Author

@jgao54 jgao54 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, was thinking about how we'd want to extend this to other DBs -- in which case we'd need to pass info about which column the field exclusion is for from the client; but it feels a bit premature to worry about that given JSON is not currently supported for PG/MySQL, and even if we do support JSON for these sources, JSON field exclusion seems like a niche-enough use case that we could defer for some time.

re: sensitive data in raw table. two ways to work around it I originally had in mind was to document that doesn't require changes on our end (1) user explicitly add TTL to raw table (2) lock down permissions for raw table so it's only accessed by admin. That said, i think it might not be too tricky to just do exclusion properly, and in a way that is re-usable by flattened mode. Will explore this a bit more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mongo column exclusion seems like a mirror of the existing "excluded columns" feature - so from the product point of view if we make it work just for Mongo it's plenty good as others already have it.


export default function SkipJsonFields({
tableRow,
setRows,
}: SkipJsonFieldsProps) {
const existingExclude = useMemo(() => {
return Array.from(tableRow.exclude)
.filter((field) => field.startsWith(MONGODB_DOC_COLUMN_PREFIX))
.map((field) => field.slice(MONGODB_DOC_COLUMN_PREFIX.length))
.join(',');
}, [tableRow.exclude]);

const [showExclude, setShowExclude] = useState(
Array.from(tableRow.exclude).some((f) =>
f.startsWith(MONGODB_DOC_COLUMN_PREFIX)
)
);
const [excludeValue, setExcludeValue] = useState(existingExclude);

const updateExclude = useCallback(
(value: string) => {
const fields = value
.split(',')
.map((f) => f.trim())
.filter((f) => f !== '')
.map((f) => MONGODB_DOC_COLUMN_PREFIX + f);

setRows((prev) =>
prev.map((row) => {
if (row.source !== tableRow.source) return row;
return {
...row,
exclude: new Set(fields),
};
})
);
},
[tableRow.source, setRows]
);

const handleToggle = useCallback(
(state: boolean) => {
setShowExclude(state);
if (!state) {
setExcludeValue('');
updateExclude('');
}
},
[updateExclude]
);

const handleChange = useCallback(
(value: string) => {
setExcludeValue(value);
updateExclude(value);
},
[updateExclude]
);

return (
<div
style={{
display: 'flex',
flexDirection: 'column',
rowGap: '0.5rem',
}}
>
<div style={{ display: 'flex', alignItems: 'center', gap: '0.5rem' }}>
<RowWithCheckbox
label={
<Label as='label' style={{ fontSize: 13 }}>
Hide sensitive fields
</Label>
}
action={
<Checkbox
style={{ marginLeft: 0 }}
checked={showExclude}
onCheckedChange={handleToggle}
/>
}
/>
</div>
{showExclude && (
<TextField
variant='simple'
placeholder='password,user.ssn,metadata.secret'
value={excludeValue}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleChange(e.target.value)
}
/>
)}
</div>
);
}
3 changes: 3 additions & 0 deletions ui/app/mirrors/create/cdc/tablemapping.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface TableMappingProps {
rows: TableMapRow[];
setRows: Dispatch<SetStateAction<TableMapRow[]>>;
peerType?: DBType;
sourcePeerType?: DBType;
// schema -> omitted source table mapping
alreadySelectedTablesMapping: Map<string, TableMapping[]>;
initialLoadOnly: boolean;
Expand All @@ -28,6 +29,7 @@ export default function TablePicker({
rows,
setRows,
peerType,
sourcePeerType,
alreadySelectedTablesMapping,
initialLoadOnly,
}: TableMappingProps) {
Expand Down Expand Up @@ -99,6 +101,7 @@ export default function TablePicker({
tableColumns={tableColumns}
setTableColumns={setTableColumns}
peerType={peerType}
sourcePeerType={sourcePeerType}
alreadySelectedTables={alreadySelectedTablesMapping.get(schema)}
initialLoadOnly={initialLoadOnly}
/>
Expand Down
Loading