-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplatform.go
More file actions
254 lines (213 loc) · 6.8 KB
/
platform.go
File metadata and controls
254 lines (213 loc) · 6.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// The platform is an extensible modular system for writing HTTP servers.
//
// 1. Provides a global registry for middleware and module registration
// 2. Provides a lifecycle to the modules for graceful shutdown
// 3. Provides a router the modules can attach to
//
// It's advised to use `platform.Register` from `init` functions.
// Similarly, `platform.Use` should be used from `main` or any
// descendant setup functions. Don't use these functions from tests
// as they create a shared state.
//
// It's possible to use the platform in an emperative way.
//
// ```go
// svc := platform.New(platform.NewOptions())
// svg.Use(middleware.Logger)
// svc.Register(user.NewModule())
// ```
//
// The platform lifecycle is extensively tested to ensure no races, no
// goroutine leaks. Each platform object creates a copy of the global
// state and holds scoped allocations only, enabling test parallelism.
package platform
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"time"
chi "github.com/go-chi/chi/v5"
"github.com/titpetric/platform/internal"
"github.com/titpetric/platform/pkg/httpcontext"
"github.com/titpetric/platform/pkg/telemetry"
)
// Platform is our world struct.
type Platform struct {
options *Options
// server setup
router *chi.Mux
server *http.Server
listener net.Listener
// final shutdown context
context context.Context
cancel context.CancelFunc
stop func()
once sync.Once
// registry holds settings for plugins and middleware.
// It's auto-filled from global scope.
registry *Registry
}
// New will create a new *Platform object. It is the allocation point
// for each platform instance. If no options are passed, the defaults are in use.
// The defaults options are provided by NewOptions().
func New(options *Options) *Platform {
if options == nil {
options = NewOptions()
}
p := &Platform{
options: options,
router: chi.NewRouter(),
stop: func() {},
}
// Set up the default registry.
p.registry = global.registry.Clone()
// Set up final shutdown signal.
p.context, p.cancel = context.WithCancel(context.Background())
return p
}
// Register will add a registry.Module into the internal platform registry.
// This function should be called before Serve is called.
func (p *Platform) Register(m Module) {
p.registry.Register(m)
}
// Use will add a middleware to the internal platform registry.
// This function should be called before Serve is called.
func (p *Platform) Use(m Middleware) {
p.registry.Use(m)
}
// Stats will report how many middlewares and plugins are added to the registry.
func (p *Platform) Stats() (int, int) {
return p.registry.Stats()
}
// Find fills target with the module matching the type.
func (p *Platform) Find(target any) bool {
return p.registry.Find(target)
}
// Start will start the server and print the registered routes.
// It respects cancellation from the passed context, as well as
// sets up signal notification to respond to SIGTERM.
func (p *Platform) Start(ctx context.Context) error {
if err := p.setup(ctx); err != nil {
return fmt.Errorf("error in platform setup: %w", err)
}
// If the program receives a SIGTERM, trigger shutdown.
sigctx, stop := signal.NotifyContext(ctx, os.Interrupt, os.Kill)
p.stop = stop
go func() {
<-sigctx.Done()
if !p.options.Quiet {
log.Println("caught sigterm, stopping server")
}
p.Stop()
}()
// Start the server.
go func() {
if err := p.server.Serve(p.listener); err != nil && err != http.ErrServerClosed {
telemetry.CaptureError(p.context, err)
}
}()
// Print registered routes.
if !p.options.Quiet {
internal.PrintRoutes(p.router)
}
return nil
}
func (p *Platform) setup(startCtx context.Context) error {
// set up context for module start
ctx := platformContext.SetContext(startCtx, p)
ctx = optionsContext.SetContext(ctx, p.options)
ctx, span := telemetry.Start(ctx, "platform.setup")
defer span.End()
if err := p.registry.Start(ctx, p.router, p.options); err != nil {
return fmt.Errorf("registry: %w", err)
}
if err := p.setupListener(); err != nil {
return fmt.Errorf("setting up listener: %w", err)
}
p.server = &http.Server{
Handler: p.setupRequestContext(p.router),
}
return nil
}
// setupRequestContext will bind *Platform to the request context.
func (p *Platform) setupRequestContext(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
platformContext.Set(r, p)
optionsContext.Set(r, p.options)
next.ServeHTTP(w, r)
})
}
func (p *Platform) setupListener() error {
// Set up server listener.
listener, err := net.Listen("tcp", p.options.ServerAddr)
if err != nil {
return err
}
p.listener = listener
if !p.options.Quiet {
log.Println("Server listening on", p.listener.Addr().String(), p.URL())
}
return nil
}
// Context returns the cancellation context for the service.
// When the context finishes, the server has shut down.
func (p *Platform) Context() context.Context {
return p.context
}
// Wait will pause until the server is shut down.
func (p *Platform) Wait() {
// Wait for Stop() to be invoked.
<-p.context.Done()
}
// URL gives the e2e endpoint URL for requests.
func (p *Platform) URL() string {
listenAddr := p.listener.Addr().String()
_, port, _ := net.SplitHostPort(listenAddr)
return "http://127.0.0.1:" + port
}
// Stop will gracefully shutdown the server and then cancel the server context when done.
//
// Stop is an important part of the lifecycle tests. When closing the registry,
// each plugins Stop function gets invoked in parallel. This enables the plugin
// to clear background goroutine event loops, or flush a dirty buffer to storage.
//
// Only after the server has fully shut down does the internal context get cancelled.
func (p *Platform) Stop() {
p.once.Do(func() {
// Give a 5 second timeout for a graceful shutdown.
ctx, cancel := context.WithTimeout(p.Context(), 5*time.Second)
defer cancel()
// When done, exit main. It's waiting for the cancelled context there.
defer func() {
p.stop()
p.cancel()
p.registry.Close(p.context)
}()
// Capture error to telemetry sink.
telemetry.CaptureError(p.context, p.server.Shutdown(ctx))
})
}
type platformKey struct{}
var platformContext = httpcontext.NewValue[*Platform](platformKey{})
// FromRequest returns the *Platform instance attached to the request.
func FromRequest(r *http.Request) *Platform {
return platformContext.Get(r)
}
// FromContext returns the *Platform instance attached to the context.
func FromContext(ctx context.Context) *Platform {
return platformContext.GetContext(ctx)
}
// Start is a shorthand to create a new *Platform instance and
// immediately starts the server listener and handles requests.
func Start(ctx context.Context, options *Options) (*Platform, error) {
svc := New(options)
if err := svc.Start(ctx); err != nil {
return nil, err
}
return svc, nil
}