Skip to content
This repository was archived by the owner on Dec 6, 2025. It is now read-only.

Commit f8c572e

Browse files
authored
StartProjections (#822)
1 parent e15acbd commit f8c572e

File tree

14 files changed

+129
-110
lines changed

14 files changed

+129
-110
lines changed

dev/docker-compose.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3'
2-
31
services:
42

53
postgres:

libs/common/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func StartNewGRPCServer(ctx context.Context, addr string, registerServerHook fun
101101
}
102102

103103
// Shut down Setup()'s resources
104-
Shutdown()
104+
Shutdown(nil)
105105
}
106106

107107
// DefaultInterceptors returns the slice of default Interceptors for the GRPC service

libs/common/setup.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@ import (
99
"github.com/rs/zerolog/log"
1010
"hwutil"
1111
"net/http"
12+
"os"
1213
"strings"
1314
"telemetry"
15+
"time"
1416
)
1517

1618
var (
1719
Mode string // Mode is set in Setup()
1820
InsecureFakeTokenEnable = false
1921
InstanceOrganizationID *uuid.UUID
2022
shutdownOpenTelemetryFn func() // cleanup function
23+
contextCancel func() // Setup() yields the "root" context, which can be canceled using this function
2124
)
2225

2326
const DevelopmentMode = "development"
@@ -57,8 +60,9 @@ var skipAuthForMethods []string
5760

5861
// Setup loads the .env file and sets up logging,
5962
// also sets up tokens when the service requires auth.
60-
func Setup(serviceName, version string, opts ...SetupOption) {
61-
ctx := context.Background()
63+
func Setup(serviceName, version string, opts ...SetupOption) context.Context {
64+
ctx, cancel := context.WithCancel(context.Background())
65+
contextCancel = cancel
6266

6367
// Collect options
6468
options := SetupOptions{}
@@ -130,6 +134,8 @@ func Setup(serviceName, version string, opts ...SetupOption) {
130134

131135
setupAuth(ctx, options.fakeAuthOnly)
132136
}
137+
138+
return ctx
133139
}
134140

135141
// ResolveAddrFromEnv uses the "APP_PORT", "PORT" and "ADDR" env variables to
@@ -150,7 +156,22 @@ func ResolveAddrFromEnv() string {
150156
// It should only ever be called after Setup() or SetupWithUnauthenticatedMethods()!
151157
// StartNewGRPCServer() already calls this function, so there is usually not need to call Shutdown() at all!
152158
// Keep in mind, that this shuts down the otel exporter, new traces won't be processed!
153-
func Shutdown() {
154-
log.Info().Msg("shutting down otel")
155-
shutdownOpenTelemetryFn()
159+
func Shutdown(err error) {
160+
if shutdownOpenTelemetryFn != nil {
161+
log.Info().Msg("shutting down otel")
162+
shutdownOpenTelemetryFn()
163+
}
164+
165+
if contextCancel != nil {
166+
log.Info().Msg("canceling main context")
167+
contextCancel()
168+
}
169+
170+
time.Sleep(time.Second * 2) // give other resources some time to react to closed context (not sure if needed)
171+
172+
exitCode := 0
173+
if err != nil {
174+
exitCode = 1
175+
}
176+
os.Exit(exitCode)
156177
}

libs/hwes/eventstoredb/projections/custom/custom.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ type EventStoreClient interface {
2828

2929
type eventHandler func(ctx context.Context, evt hwes.Event) (error, *esdb.NackAction)
3030

31+
// ICustomProjection is an interface, implemented by CustomProjection,
32+
// and thus also implemented by all types that struct-embed CustomProjection
33+
type ICustomProjection interface {
34+
// Subscribe creates and subscribes to a persistent subscription in EventStoreDB
35+
// A persistent subscription is a type of subscription where the state is saved on the server-side
36+
// This function blocks the thread until the passed context gets canceled
37+
// https://developers.eventstore.com/server/v23.10/persistent-subscriptions.html
38+
Subscribe(ctx context.Context) error
39+
GetSubscriptionGroupName() string
40+
}
41+
3142
// CustomProjection can be used to develop own projections
3243
// A projection is an event sourcing pattern to build up
3344
// a read model based on the underlying event data.
@@ -77,6 +88,14 @@ func NewCustomProjection(esdbClient EventStoreClient, subscriptionGroupName stri
7788
}
7889
}
7990

91+
func (p *CustomProjection) GetSubscriptionGroupName() string {
92+
return p.subscriptionGroupName
93+
}
94+
95+
func (p *CustomProjection) CustomProjection() *CustomProjection {
96+
return p
97+
}
98+
8099
func (p *CustomProjection) RegisterEventListener(eventType string, eventHandler eventHandler) *CustomProjection {
81100
if _, found := p.eventHandlers[eventType]; found {
82101
zlog.Error().
@@ -100,10 +119,6 @@ func (p *CustomProjection) HandleEvent(ctx context.Context, event hwes.Event) (e
100119
return eventHandler(ctx, event)
101120
}
102121

103-
// Subscribe creates and subscribes to a persistent subscription in EventStoreDB
104-
// A persistent subscription is a type of subscription where the state is saved on the server-side
105-
// This function blocks the thread until the passed context gets canceled
106-
// https://developers.eventstore.com/server/v23.10/persistent-subscriptions.html
107122
func (p *CustomProjection) Subscribe(ctx context.Context) error {
108123
log := zlog.Ctx(ctx)
109124

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package projections
2+
3+
import (
4+
"context"
5+
"github.com/rs/zerolog/log"
6+
"hwes/eventstoredb/projections/custom"
7+
)
8+
9+
// StartProjections starts the passed projections, onErr may be nil and is called on the first subscription error
10+
func StartProjections(ctx context.Context, onErr func(err error), projections ...custom.ICustomProjection) {
11+
errCh := make(chan error)
12+
13+
for _, projection := range projections {
14+
name := projection.GetSubscriptionGroupName()
15+
16+
// spawn new green thread, as Subscribe is blocking
17+
go func() {
18+
if err := projection.Subscribe(ctx); err != nil {
19+
log.Err(err).Msgf("could not subscribe to %s", name)
20+
errCh <- err // send error back to function thread
21+
}
22+
}()
23+
}
24+
25+
err := <-errCh // wait for any errors
26+
27+
if onErr != nil {
28+
onErr(err)
29+
}
30+
}

services/ory-svc/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ func newErrAndLog(ctx context.Context, msg string) error {
4343
}
4444

4545
func main() {
46-
common.Setup(ServiceName, Version)
46+
ctx := common.Setup(ServiceName, Version)
4747

48-
ctx := context.Background()
4948
log := zlog.Ctx(ctx)
5049

5150
DaprPubsub = hwutil.GetEnvOr("DAPR_PUBSUB", "pubsub")
@@ -83,7 +82,7 @@ func main() {
8382

8483
// we don't use common.StartNewGRPCServer,
8584
// so we have to call Shutdown manually
86-
common.Shutdown()
85+
common.Shutdown(nil)
8786
}
8887

8988
func prepCtxForSvcToSvcCall(parentCtx context.Context, targetDaprAppId string) (context.Context, context.CancelFunc, error) {

services/property-svc/cmd/service/main.go

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,32 @@ package service
22

33
import (
44
"common"
5-
"context"
65
"flag"
6+
77
pb "gen/services/property_svc/v1"
8+
daprd "github.com/dapr/go-sdk/service/grpc"
89
"github.com/rs/zerolog/log"
910
"hwdb"
1011
"hwes/eventstoredb"
12+
"hwes/eventstoredb/projections"
13+
1114
propertySet "property-svc/internal/property-set/api"
15+
psh "property-svc/internal/property-set/handlers"
1216
propertyValue "property-svc/internal/property-value/api"
17+
pvh "property-svc/internal/property-value/handlers"
1318
"property-svc/internal/property-value/projections/property_value_postgres_projection"
1419
propertyViews "property-svc/internal/property-view/api"
20+
pvih "property-svc/internal/property-view/handlers"
1521
"property-svc/internal/property-view/projections/property_rules_postgres"
1622
property "property-svc/internal/property/api"
17-
"property-svc/internal/property/projections/property_postgres_projection"
18-
19-
daprd "github.com/dapr/go-sdk/service/grpc"
20-
psh "property-svc/internal/property-set/handlers"
21-
pvh "property-svc/internal/property-value/handlers"
22-
pvih "property-svc/internal/property-view/handlers"
2323
ph "property-svc/internal/property/handlers"
24+
"property-svc/internal/property/projections/property_postgres_projection"
2425
)
2526

2627
const ServiceName = "property-svc"
2728

2829
func Main(version string, ready func()) {
29-
ctx, cancel := context.WithCancel(context.Background())
30-
common.Setup(ServiceName, version, common.WithAuth())
30+
ctx := common.Setup(ServiceName, version, common.WithAuth())
3131

3232
replayMode := flag.Bool("replay", false, "")
3333
flag.Parse()
@@ -39,44 +39,22 @@ func Main(version string, ready func()) {
3939
eventStore := eventstoredb.SetupEventStoreByEnv()
4040
aggregateStore := eventstoredb.NewAggregateStore(eventStore)
4141

42-
propertyPostgresProjection := property_postgres_projection.
43-
NewProjection(eventStore, ServiceName, hwdb.GetDB())
44-
45-
propertyValuePostgresProjection := property_value_postgres_projection.
46-
NewProjection(eventStore, ServiceName, hwdb.GetDB())
47-
4842
if *replayMode {
49-
if err := replay(ctx, eventStore); err != nil {
43+
err := replay(ctx, eventStore)
44+
if err != nil {
5045
log.Err(err).Msg("error during replay")
51-
cancel()
5246
}
53-
// TODO: Find a more generic approach to run common.Shutdown()
54-
common.Shutdown()
55-
cancel()
47+
common.Shutdown(err)
5648
return
5749
}
5850

59-
go func() {
60-
if err := propertyPostgresProjection.Subscribe(ctx); err != nil {
61-
log.Err(err).Msg("error during property-postgres projection subscription")
62-
cancel()
63-
}
64-
}()
65-
66-
go func() {
67-
if err := propertyValuePostgresProjection.Subscribe(ctx); err != nil {
68-
log.Err(err).Msg("error during propertyValue-postgres projection subscription")
69-
cancel()
70-
}
71-
}()
72-
73-
go func() {
74-
taskViewsPostgresProjection := property_rules_postgres.NewProjection(eventStore, ServiceName)
75-
if err := taskViewsPostgresProjection.Subscribe(ctx); err != nil {
76-
log.Err(err).Msg("error during taskViewsPostgresProjection subscription")
77-
cancel()
78-
}
79-
}()
51+
go projections.StartProjections(
52+
ctx,
53+
common.Shutdown,
54+
property_postgres_projection.NewProjection(eventStore, ServiceName, hwdb.GetDB()),
55+
property_value_postgres_projection.NewProjection(eventStore, ServiceName, hwdb.GetDB()),
56+
property_rules_postgres.NewProjection(eventStore, ServiceName),
57+
)
8058

8159
propertyHandlers := ph.NewPropertyHandlers(aggregateStore)
8260
propertySetHandlers := psh.NewPropertySetHandlers(aggregateStore)
@@ -96,5 +74,5 @@ func Main(version string, ready func()) {
9674
}
9775
})
9876

99-
cancel()
77+
common.Shutdown(nil)
10078
}

services/property-svc/internal/property-view/api/grpc_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import (
2525
"google.golang.org/grpc"
2626
)
2727

28-
func server(ctx context.Context) (pb.PropertyViewsServiceClient, *hwes_test.AggregateStore, func()) {
28+
func server() (context.Context, pb.PropertyViewsServiceClient, *hwes_test.AggregateStore, func()) {
2929
// Build gRPC service
3030
aggregateStore := hwes_test.NewAggregateStore()
3131
propertyViewHandlers := handlers.NewPropertyViewHandlers(aggregateStore)
3232
grpcService := api.NewPropertyViewService(aggregateStore, propertyViewHandlers)
3333

34-
common.Setup("property-svc", "test", common.WithFakeAuthOnly())
34+
ctx := common.Setup("property-svc", "test", common.WithFakeAuthOnly())
3535

3636
// Start Server
3737
grpcServer := grpc.NewServer(common.DefaultInterceptorChain())
@@ -40,12 +40,11 @@ func server(ctx context.Context) (pb.PropertyViewsServiceClient, *hwes_test.Aggr
4040

4141
client := pb.NewPropertyViewsServiceClient(conn)
4242

43-
return client, aggregateStore, closer
43+
return ctx, client, aggregateStore, closer
4444
}
4545

4646
func setup(t *testing.T) (ctx context.Context, client pb.PropertyViewsServiceClient, as *hwes_test.AggregateStore, dbMock pgxmock.PgxPoolIface, teardown func()) {
47-
ctx = context.Background()
48-
client, as, closer := server(ctx)
47+
ctx, client, as, closer := server()
4948
ctx = common_test.AuthenticatedUserContext(ctx, uuid.NewString())
5049

5150
dbMock, err := pgxmock.NewPool()

services/property-svc/internal/property/projections/property_postgres_projection/property_postgres_projection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func NewProjection(es *esdb.Client, serviceName string, db hwdb.DBTX) *Projectio
3030
p := &Projection{
3131
CustomProjection: custom.NewCustomProjection(es, subscriptionGroupName, &[]string{fmt.Sprintf("%s-", aggregate.PropertyAggregateType)}),
3232
db: db,
33-
propertyRepo: property_repo.New(db)}
33+
propertyRepo: property_repo.New(db),
34+
}
3435
p.initEventListeners()
3536
return p
3637
}

services/task-svc/main.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"common"
5-
"context"
65
pb "gen/services/task_svc/v1"
76
daprd "github.com/dapr/go-sdk/service/grpc"
87
"hwdb"
@@ -22,14 +21,14 @@ const ServiceName = "task-svc"
2221
var Version string
2322

2423
func main() {
25-
common.Setup(ServiceName, Version, common.WithAuth())
24+
ctx := common.Setup(ServiceName, Version, common.WithAuth())
2625

27-
closeDBPool := hwdb.SetupDatabaseFromEnv(context.Background())
26+
closeDBPool := hwdb.SetupDatabaseFromEnv(ctx)
2827
defer closeDBPool()
2928

3029
tracking.SetupTracking(ServiceName, 10, 24*time.Hour, 20)
3130

32-
common.StartNewGRPCServer(context.Background(), common.ResolveAddrFromEnv(), func(server *daprd.Server) {
31+
common.StartNewGRPCServer(ctx, common.ResolveAddrFromEnv(), func(server *daprd.Server) {
3332
grpcServer := server.GrpcServer()
3433
pb.RegisterTaskServiceServer(grpcServer, task.NewServiceServer())
3534
pb.RegisterPatientServiceServer(grpcServer, patient.NewServiceServer())

0 commit comments

Comments
 (0)