Skip to content

Commit 4d85413

Browse files
committed
Skip flattening for mutations across imports
1 parent ffa6d64 commit 4d85413

4 files changed

Lines changed: 49 additions & 67 deletions

File tree

pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/GraphIngestionPipeline.java

Lines changed: 37 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.google.gson.JsonArray;
55
import com.google.gson.JsonElement;
66
import com.google.gson.JsonParser;
7-
import java.util.ArrayList;
7+
import java.util.Arrays;
88
import java.util.List;
99
import org.apache.beam.sdk.Pipeline;
1010
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
@@ -70,12 +70,6 @@ public static void buildPipeline(
7070
Pipeline pipeline, IngestionPipelineOptions options, SpannerClient spannerClient) {
7171
LOGGER.info("Running import pipeline for imports: {}", options.getImportList());
7272

73-
// Initialize lists to hold mutations from all imports.
74-
List<PCollection<Void>> deleteOpsList = new ArrayList<>();
75-
List<PCollection<Mutation>> obsMutationList = new ArrayList<>();
76-
List<PCollection<Mutation>> edgeMutationList = new ArrayList<>();
77-
List<PCollection<Mutation>> nodeMutationList = new ArrayList<>();
78-
7973
// Parse the input import list JSON.
8074
JsonElement jsonElement = JsonParser.parseString(options.getImportList());
8175
JsonArray jsonArray = jsonElement.getAsJsonArray();
@@ -97,37 +91,8 @@ public static void buildPipeline(
9791
String graphPath = pathElement.getAsString();
9892

9993
// Process the individual import.
100-
processImport(
101-
pipeline,
102-
spannerClient,
103-
importName,
104-
graphPath,
105-
options.getSkipDelete(),
106-
deleteOpsList,
107-
nodeMutationList,
108-
edgeMutationList,
109-
obsMutationList);
110-
}
111-
// Finally, aggregate all collected mutations and write them to Spanner.
112-
// 1. Process Deletes:
113-
// First, execute all delete mutations to clear old data for the imports.
114-
PCollection<Void> deleted =
115-
PCollectionList.of(deleteOpsList).apply("DeleteOps", Flatten.pCollections());
116-
117-
// 2. Process Observations:
118-
// Write observation mutations after deletes are complete.
119-
if (options.getWriteObsGraph()) {
120-
spannerClient.writeMutations(pipeline, "Observations", obsMutationList, deleted);
94+
processImport(pipeline, spannerClient, importName, graphPath, options.getSkipDelete());
12195
}
122-
123-
// 3. Process Nodes:
124-
// Write node mutations after deletes are complete.
125-
SpannerWriteResult writtenNodes =
126-
spannerClient.writeMutations(pipeline, "Nodes", nodeMutationList, deleted);
127-
128-
// 4. Process Edges:
129-
// Write edge mutations only after node mutations are complete to ensure referential integrity.
130-
spannerClient.writeMutations(pipeline, "Edges", edgeMutationList, writtenNodes.getOutput());
13196
}
13297

13398
/**
@@ -138,31 +103,26 @@ public static void buildPipeline(
138103
* @param importName The name of the import.
139104
* @param graphPath The full path to the graph data.
140105
* @param skipDelete Whether to skip delete operations.
141-
* @param deleteOpsList List to collect delete Ops.
142-
* @param nodeMutationList List to collect node mutations.
143-
* @param edgeMutationList List to collect edge mutations.
144-
* @param obsMutationList List to collect observation mutations.
145106
*/
146107
private static void processImport(
147108
Pipeline pipeline,
148109
SpannerClient spannerClient,
149110
String importName,
150111
String graphPath,
151-
boolean skipDelete,
152-
List<PCollection<Void>> deleteOpsList,
153-
List<PCollection<Mutation>> nodeMutationList,
154-
List<PCollection<Mutation>> edgeMutationList,
155-
List<PCollection<Mutation>> obsMutationList) {
112+
boolean skipDelete) {
156113
LOGGER.info("Import: {} Graph path: {}", importName, graphPath);
157114

158115
String provenance = "dc/base/" + importName;
159116

160117
// 1. Prepare Deletes:
161118
// Generate mutations to delete existing data for this import/provenance.
119+
// Create a dummy signal if deletes are skipped, so downstream dependencies are satisfied
120+
// immediately.
121+
PCollection<Void> deleteObsWait = null;
122+
PCollection<Void> deleteEdgesWait = null;
162123
if (!skipDelete) {
163-
List<PCollection<Void>> deleteOps =
164-
GraphReader.deleteExistingDataForImport(importName, provenance, pipeline, spannerClient);
165-
deleteOpsList.addAll(deleteOps);
124+
deleteObsWait = spannerClient.deleteObservationsForImport(importName, pipeline);
125+
deleteEdgesWait = spannerClient.deleteEdgesForImport(provenance, pipeline);
166126
}
167127

168128
// 2. Read and Split Graph:
@@ -176,29 +136,50 @@ private static void processImport(
176136
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
177137

178138
// 3. Process Schema Nodes:
179-
// Combine schema nodes if required, then convert to Node and Edge mutations.
139+
// Combine/Deduplicate nodes if required.
180140
PCollection<McfGraph> combinedGraph = schemaNodes;
181141
if (IMPORTS_TO_COMBINE.contains(importName)) {
182142
combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes);
183143
}
144+
145+
// Convert all nodes to mutations
184146
PCollection<Mutation> nodeMutations =
185147
GraphReader.graphToNodes(
186-
importName, combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter)
148+
"NodeMutations-" + importName,
149+
combinedGraph,
150+
spannerClient,
151+
nodeCounter,
152+
nodeInvalidTypeCounter)
187153
.apply("ExtractNodeMutations-" + importName, Values.create());
188154
PCollection<Mutation> edgeMutations =
189-
GraphReader.graphToEdges(importName, combinedGraph, provenance, spannerClient, edgeCounter)
155+
GraphReader.graphToEdges(
156+
"EdgeMutations-" + importName,
157+
combinedGraph,
158+
provenance,
159+
spannerClient,
160+
edgeCounter)
190161
.apply("ExtractEdgeMutations-" + importName, Values.create());
191162

192-
nodeMutationList.add(nodeMutations);
193-
edgeMutationList.add(edgeMutations);
163+
// Write Nodes (wait for delete)
164+
SpannerWriteResult writtenNodes =
165+
spannerClient.writeMutations(pipeline, "Nodes-" + importName, List.of(nodeMutations), null);
166+
167+
PCollection<Void> writeEdgesWait =
168+
PCollectionList.of(Arrays.asList(writtenNodes.getOutput(), deleteEdgesWait))
169+
.apply("FlattenDeleteOps-" + importName, Flatten.pCollections());
170+
// Write Edges (wait for Nodes)
171+
spannerClient.writeMutations(
172+
pipeline, "Edges-" + importName, List.of(edgeMutations), writeEdgesWait);
194173

195174
// 4. Process Observation Nodes:
196175
// Build an optimized graph from observation nodes and convert to Observation mutations.
197176
PCollection<McfOptimizedGraph> optimizedGraph =
198177
PipelineUtils.buildOptimizedMcfGraph(observationNodes);
199178
PCollection<Mutation> observationMutations =
200179
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
201-
.apply("ExtractObsMutations", Values.create());
202-
obsMutationList.add(observationMutations);
180+
.apply("ExtractObsMutations-" + importName, Values.create());
181+
// Write Observations (wait for delete)
182+
spannerClient.writeMutations(
183+
pipeline, "Observations-" + importName, List.of(observationMutations), deleteObsWaitt );
203184
}
204185
}

pipeline/ingestion/src/test/java/org/datacommons/ingestion/pipeline/GraphIngestionPipelineIntegrationTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.IOException;
2424
import java.nio.charset.StandardCharsets;
2525
import java.nio.file.Files;
26-
import java.util.UUID;
2726
import org.apache.beam.runners.dataflow.DataflowRunner;
2827
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
2928
import org.apache.beam.sdk.Pipeline;
@@ -86,7 +85,7 @@ public class GraphIngestionPipelineIntegrationTest {
8685
private String region;
8786
private String emulatorHost;
8887
private boolean isLocal;
89-
private String importName = "TestImport-" + UUID.randomUUID().toString();
88+
private String importName = "TestImport";
9089
private String nodeNameValue = "Test Node Name";
9190
private SpannerClient spannerClient;
9291

pipeline/util/src/main/java/org/datacommons/ingestion/util/GraphReader.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55
import java.io.Serializable;
66
import java.nio.charset.StandardCharsets;
77
import java.util.ArrayList;
8-
import java.util.Arrays;
98
import java.util.Collections;
109
import java.util.List;
1110
import java.util.Map;
12-
import org.apache.beam.sdk.Pipeline;
1311
import org.apache.beam.sdk.metrics.Counter;
1412
import org.apache.beam.sdk.transforms.DoFn;
1513
import org.apache.beam.sdk.transforms.ParDo;
@@ -97,6 +95,9 @@ public static List<Edge> graphToEdges(McfGraph graph, String provenance) {
9795
String dcid = GraphUtils.getPropertyValue(pv, "dcid");
9896
String subjectId = !dcid.isEmpty() ? dcid : McfUtil.stripNamespace(nodeEntry.getKey());
9997
for (Map.Entry<String, McfGraph.Values> entry : pv.entrySet()) {
98+
if (entry.getKey().equals("dcid")) {
99+
continue;
100+
}
100101
for (TypedValue val : entry.getValue().getTypedValuesList()) {
101102
if (val.getType() != ValueType.RESOLVED_REF) {
102103
int valSize = val.getValue().getBytes(StandardCharsets.UTF_8).length;
@@ -155,13 +156,6 @@ public static Observation graphToObservations(McfOptimizedGraph graph, String im
155156
return obs.build();
156157
}
157158

158-
public static List<PCollection<Void>> deleteExistingDataForImport(
159-
String importName, String provenance, Pipeline pipeline, SpannerClient spannerClient) {
160-
return Arrays.asList(
161-
spannerClient.deleteObservationsForImport(importName, pipeline),
162-
spannerClient.deleteEdgesForImport(provenance, pipeline));
163-
}
164-
165159
public static PCollection<KV<String, Mutation>> graphToObservations(
166160
PCollection<McfOptimizedGraph> graph,
167161
String importName,

pipeline/util/src/test/java/org/datacommons/ingestion/util/GraphReaderTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,14 @@ public void testGraphToEdges() {
173173
.setType(ValueType.TEXT)
174174
.setValue("Subject Node"))
175175
.build())
176+
.putPvs(
177+
"dcid",
178+
McfGraph.Values.newBuilder()
179+
.addTypedValues(
180+
TypedValue.newBuilder()
181+
.setType(ValueType.TEXT)
182+
.setValue("dcid_subject"))
183+
.build())
176184
.putPvs(
177185
"typeOf",
178186
McfGraph.Values.newBuilder()

0 commit comments

Comments
 (0)