Skip to content

Commit fb53f0d

Browse files
authored
Merge pull request #234 from justinsb/dag_with_order
Tweak topological sort to preserve client-specified order
2 parents 966d4be + facc829 commit fb53f0d

File tree

9 files changed

+235
-180
lines changed

9 files changed

+235
-180
lines changed

pkg/graph/builder.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,17 @@ func (b *Builder) NewResourceGraphDefinition(originalCR *v1alpha1.ResourceGraphD
142142

143143
// we'll also store the resources in a map for easy access later.
144144
resources := make(map[string]*Resource)
145-
for _, rgResource := range rgd.Spec.Resources {
146-
r, err := b.buildRGResource(rgResource, namespacedResources)
145+
for i, rgResource := range rgd.Spec.Resources {
146+
id := rgResource.ID
147+
order := i
148+
r, err := b.buildRGResource(rgResource, namespacedResources, order)
147149
if err != nil {
148-
return nil, fmt.Errorf("failed to build resource '%v': %v", rgResource.ID, err)
150+
return nil, fmt.Errorf("failed to build resource %q: %w", id, err)
149151
}
150-
resources[rgResource.ID] = r
152+
if resources[id] != nil {
153+
return nil, fmt.Errorf("found resources with duplicate id %q", id)
154+
}
155+
resources[id] = r
151156
}
152157

153158
// At this stage we have a superficial understanding of the resources that are
@@ -246,7 +251,7 @@ func (b *Builder) NewResourceGraphDefinition(originalCR *v1alpha1.ResourceGraphD
246251
// It provides a high-level understanding of the resource, by extracting the
247252
// OpenAPI schema, emualting the resource and extracting the cel expressions
248253
// from the schema.
249-
func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResources map[k8sschema.GroupKind]bool) (*Resource, error) {
254+
func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResources map[k8sschema.GroupKind]bool, order int) (*Resource, error) {
250255
// 1. We need to unmashal the resource into a map[string]interface{} to
251256
// make it easier to work with.
252257
resourceObject := map[string]interface{}{}
@@ -334,6 +339,7 @@ func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResou
334339
readyWhenExpressions: readyWhen,
335340
includeWhenExpressions: includeWhen,
336341
namespaced: isNamespaced,
342+
order: order,
337343
}, nil
338344
}
339345

@@ -364,13 +370,13 @@ func (b *Builder) buildDependencyGraph(
364370

365371
directedAcyclicGraph := dag.NewDirectedAcyclicGraph()
366372
// Set the vertices of the graph to be the resources defined in the resource graph definition.
367-
for resourceName := range resources {
368-
if err := directedAcyclicGraph.AddVertex(resourceName); err != nil {
373+
for _, resource := range resources {
374+
if err := directedAcyclicGraph.AddVertex(resource.id, resource.order); err != nil {
369375
return nil, fmt.Errorf("failed to add vertex to graph: %w", err)
370376
}
371377
}
372378

373-
for resourceName, resource := range resources {
379+
for _, resource := range resources {
374380
for _, resourceVariable := range resource.variables {
375381
for _, expression := range resourceVariable.Expressions {
376382
// We need to inspect the expression to understand how it relates to the
@@ -397,10 +403,8 @@ func (b *Builder) buildDependencyGraph(
397403
resource.addDependencies(resourceDependencies...)
398404
resourceVariable.AddDependencies(resourceDependencies...)
399405
// We need to add the dependencies to the graph.
400-
for _, dependency := range resourceDependencies {
401-
if err := directedAcyclicGraph.AddEdge(resourceName, dependency); err != nil {
402-
return nil, err
403-
}
406+
if err := directedAcyclicGraph.AddDependencies(resource.id, resourceDependencies); err != nil {
407+
return nil, err
404408
}
405409
}
406410
}

pkg/graph/builder_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
509509
assert.Contains(t, clusterDeps, "subnet2")
510510

511511
// Validate topological order
512-
assert.Equal(t, []string{"clusterpolicy", "clusterrole", "vpc", "subnet1", "subnet2", "cluster"}, g.TopologicalOrder)
512+
assert.Equal(t, []string{"vpc", "clusterpolicy", "clusterrole", "subnet1", "subnet2", "cluster"}, g.TopologicalOrder)
513513
},
514514
},
515515
{
@@ -571,7 +571,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
571571
}, nil, nil),
572572
},
573573
wantErr: true,
574-
errMsg: "This would create a cycle",
574+
errMsg: "graph contains a cycle",
575575
},
576576
{
577577
name: "independent pods",
@@ -726,7 +726,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
726726
}, nil, nil),
727727
},
728728
wantErr: true,
729-
errMsg: "This would create a cycle",
729+
errMsg: "graph contains a cycle",
730730
},
731731
{
732732
name: "shared infrastructure dependencies",
@@ -919,17 +919,17 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
919919

920920
// Validate topological order
921921
assert.Equal(t, []string{
922-
"policy",
923-
"role",
924922
"vpc",
925923
"subnet1",
926924
"subnet2",
927925
"subnet3",
926+
"secgroup",
927+
"policy",
928+
"role",
928929
"cluster1",
929930
"cluster2",
930931
"cluster3",
931932
"monitor",
932-
"secgroup",
933933
}, g.TopologicalOrder)
934934
},
935935
},

pkg/graph/dag/dag.go

Lines changed: 89 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,28 @@
1414
package dag
1515

1616
import (
17+
"errors"
1718
"fmt"
1819
"sort"
1920
"strings"
21+
22+
"golang.org/x/exp/maps"
2023
)
2124

2225
// Vertex represents a node/vertex in a directed acyclic graph.
2326
type Vertex struct {
2427
// ID is a unique identifier for the node
2528
ID string
26-
// Edges stores the IDs of the nodes that this node has an outgoing edge to.
27-
// In kro, this would be the children of a resource.
28-
Edges map[string]struct{}
29+
// Order records the original order, and is used to preserve the original user-provided ordering as far as posible.
30+
Order int
31+
// DependsOn stores the IDs of the nodes that this node depends on.
32+
// If we depend on another vertex, we must appear after that vertex in the topological sort.
33+
DependsOn map[string]struct{}
34+
}
35+
36+
func (v Vertex) String() string {
37+
dependsOn := strings.Join(maps.Keys(v.DependsOn), ",")
38+
return fmt.Sprintf("Vertex[ID: %s, Order: %d, DependsOn: %s]", v.ID, v.Order, dependsOn)
2939
}
3040

3141
// DirectedAcyclicGraph represents a directed acyclic graph
@@ -42,134 +52,128 @@ func NewDirectedAcyclicGraph() *DirectedAcyclicGraph {
4252
}
4353

4454
// AddVertex adds a new node to the graph.
45-
func (d *DirectedAcyclicGraph) AddVertex(id string) error {
55+
func (d *DirectedAcyclicGraph) AddVertex(id string, order int) error {
4656
if _, exists := d.Vertices[id]; exists {
4757
return fmt.Errorf("node %s already exists", id)
4858
}
4959
d.Vertices[id] = &Vertex{
50-
ID: id,
51-
Edges: make(map[string]struct{}),
60+
ID: id,
61+
Order: order,
62+
DependsOn: make(map[string]struct{}),
5263
}
5364
return nil
5465
}
5566

5667
type CycleError struct {
57-
From, to string
58-
Cycle []string
68+
Cycle []string
5969
}
6070

6171
func (e *CycleError) Error() string {
62-
return fmt.Sprintf("Cannot add edge from %s to %s. This would create a cycle: %s", e.From, e.to, formatCycle(e.Cycle))
72+
return fmt.Sprintf("graph contains a cycle: %s", formatCycle(e.Cycle))
6373
}
6474

6575
func formatCycle(cycle []string) string {
6676
return strings.Join(cycle, " -> ")
6777
}
6878

69-
// AddEdge adds a directed edge from one node to another.
70-
func (d *DirectedAcyclicGraph) AddEdge(from, to string) error {
79+
// AsCycleError returns the (potentially wrapped) CycleError, or nil if it is not a CycleError.
80+
func AsCycleError(err error) *CycleError {
81+
cycleError := &CycleError{}
82+
if errors.As(err, &cycleError) {
83+
return cycleError
84+
}
85+
return nil
86+
}
87+
88+
// AddDependencies adds a set of dependencies to the "from" vertex.
89+
// This indicates that all the vertexes in "dependencies" must occur before "from".
90+
func (d *DirectedAcyclicGraph) AddDependencies(from string, dependencies []string) error {
7191
fromNode, fromExists := d.Vertices[from]
72-
_, toExists := d.Vertices[to]
7392
if !fromExists {
7493
return fmt.Errorf("node %s does not exist", from)
7594
}
76-
if !toExists {
77-
return fmt.Errorf("node %s does not exist", to)
78-
}
79-
if from == to {
80-
return fmt.Errorf("self references are not allowed")
81-
}
8295

83-
fromNode.Edges[to] = struct{}{}
96+
for _, dependency := range dependencies {
97+
_, toExists := d.Vertices[dependency]
98+
if !toExists {
99+
return fmt.Errorf("node %s does not exist", dependency)
100+
}
101+
if from == dependency {
102+
return fmt.Errorf("self references are not allowed")
103+
}
104+
fromNode.DependsOn[dependency] = struct{}{}
105+
}
84106

85107
// Check if the graph is still a DAG
86-
hasCycle, cycle := d.HasCycle()
108+
hasCycle, cycle := d.hasCycle()
87109
if hasCycle {
88110
// Ehmmm, we have a cycle, let's remove the edge we just added
89-
delete(fromNode.Edges, to)
111+
for _, dependency := range dependencies {
112+
delete(fromNode.DependsOn, dependency)
113+
}
90114
return &CycleError{
91-
From: from,
92-
to: to,
93115
Cycle: cycle,
94116
}
95117
}
96118

97119
return nil
98120
}
99121

122+
// TopologicalSort returns the vertexes of the graph, respecting topological ordering first,
123+
// and preserving order of nodes within each "depth" of the topological ordering.
100124
func (d *DirectedAcyclicGraph) TopologicalSort() ([]string, error) {
101-
if cyclic, _ := d.HasCycle(); cyclic {
102-
return nil, fmt.Errorf("graph has a cycle")
103-
}
104-
105125
visited := make(map[string]bool)
106126
var order []string
107127

108-
// Get a sorted list of all vertices
109-
vertices := d.GetVertices()
128+
// Make a list of vertices, sorted by Order
129+
vertices := make([]*Vertex, 0, len(d.Vertices))
130+
for _, vertex := range d.Vertices {
131+
vertices = append(vertices, vertex)
132+
}
133+
sort.Slice(vertices, func(i, j int) bool {
134+
return vertices[i].Order < vertices[j].Order
135+
})
110136

111-
var dfs func(string)
112-
dfs = func(node string) {
113-
visited[node] = true
137+
for len(visited) < len(vertices) {
138+
progress := false
114139

115-
// Sort the neighbors to ensure deterministic order
116-
neighbors := make([]string, 0, len(d.Vertices[node].Edges))
117-
for neighbor := range d.Vertices[node].Edges {
118-
neighbors = append(neighbors, neighbor)
119-
}
120-
sort.Strings(neighbors)
140+
for _, vertex := range vertices {
141+
if visited[vertex.ID] {
142+
continue
143+
}
121144

122-
for _, neighbor := range neighbors {
123-
if !visited[neighbor] {
124-
dfs(neighbor)
145+
allDependenciesReady := true
146+
for dep := range vertex.DependsOn {
147+
if !visited[dep] {
148+
allDependenciesReady = false
149+
break
150+
}
151+
}
152+
if !allDependenciesReady {
153+
continue
125154
}
155+
156+
order = append(order, vertex.ID)
157+
visited[vertex.ID] = true
158+
progress = true
126159
}
127-
order = append(order, node)
128-
}
129160

130-
// Visit nodes in a deterministic order
131-
for _, node := range vertices {
132-
if !visited[node] {
133-
dfs(node)
161+
if !progress {
162+
hasCycle, cycle := d.hasCycle()
163+
if !hasCycle {
164+
// Unexpected!
165+
return nil, &CycleError{}
166+
}
167+
return nil, &CycleError{
168+
Cycle: cycle,
169+
}
134170
}
135171
}
136172

137173
return order, nil
138174
}
139175

140-
// GetVertices returns the nodes in the graph in sorted alphabetical
141-
// order.
142-
func (d *DirectedAcyclicGraph) GetVertices() []string {
143-
nodes := make([]string, 0, len(d.Vertices))
144-
for node := range d.Vertices {
145-
nodes = append(nodes, node)
146-
}
147-
148-
// Ensure deterministic order. This is important for TopologicalSort
149-
// to return a deterministic result.
150-
sort.Strings(nodes)
151-
return nodes
152-
}
153-
154-
// GetEdges returns the edges in the graph in sorted order...
155-
func (d *DirectedAcyclicGraph) GetEdges() [][2]string {
156-
var edges [][2]string
157-
for from, node := range d.Vertices {
158-
for to := range node.Edges {
159-
edges = append(edges, [2]string{from, to})
160-
}
161-
}
162-
sort.Slice(edges, func(i, j int) bool {
163-
// Sort by from node first
164-
if edges[i][0] == edges[j][0] {
165-
return edges[i][1] < edges[j][1]
166-
}
167-
return edges[i][0] < edges[j][0]
168-
})
169-
return edges
170-
}
171-
172-
func (d *DirectedAcyclicGraph) HasCycle() (bool, []string) {
176+
func (d *DirectedAcyclicGraph) hasCycle() (bool, []string) {
173177
visited := make(map[string]bool)
174178
recStack := make(map[string]bool)
175179
var cyclePath []string
@@ -180,14 +184,14 @@ func (d *DirectedAcyclicGraph) HasCycle() (bool, []string) {
180184
recStack[node] = true
181185
cyclePath = append(cyclePath, node)
182186

183-
for neighbor := range d.Vertices[node].Edges {
184-
if !visited[neighbor] {
185-
if dfs(neighbor) {
187+
for dependency := range d.Vertices[node].DependsOn {
188+
if !visited[dependency] {
189+
if dfs(dependency) {
186190
return true
187191
}
188-
} else if recStack[neighbor] {
192+
} else if recStack[dependency] {
189193
// Found a cycle, add the closing node to complete the cycle
190-
cyclePath = append(cyclePath, neighbor)
194+
cyclePath = append(cyclePath, dependency)
191195
return true
192196
}
193197
}

0 commit comments

Comments
 (0)