Skip to content

Commit cb92e4f

Browse files
committed
begin should use eventFactory
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
1 parent 90f0c79 commit cb92e4f

5 files changed

Lines changed: 184 additions & 122 deletions

File tree

pkg/networkservice/common/begin/client.go

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
1+
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -51,40 +51,28 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo
5151
eventFactoryClient, _ := b.LoadOrStore(request.GetConnection().GetId(),
5252
newEventFactoryClient(
5353
ctx,
54+
func() *eventFactoryClient {
55+
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
56+
return currentEventFactoryClient
57+
},
5458
func() {
5559
b.Delete(request.GetRequestConnection().GetId())
5660
},
57-
opts...,
5861
),
5962
)
60-
<-eventFactoryClient.executor.AsyncExec(func() {
61-
// If the eventFactory has changed, usually because the connection has been Closed and re-established
62-
// go back to the beginning and try again.
63-
currentEventFactoryClient, _ := b.Load(request.GetConnection().GetId())
64-
if currentEventFactoryClient != eventFactoryClient {
63+
err = <-eventFactoryClient.Request(
64+
withContext(ctx),
65+
withUserRequest(request),
66+
withGRPCOpts(opts),
67+
withConnectionToReturn(&conn),
68+
)
69+
if err != nil {
70+
if errors.Is(err, &errorEventFactoryInconsistency{}) {
6571
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryClient != eventFactoryClient")
6672
conn, err = b.Request(ctx, request, opts...)
67-
return
68-
}
69-
70-
withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
71-
request.Connection = mergeConnection(eventFactoryClient.returnedConnection, request.GetConnection(), eventFactoryClient.request.GetConnection())
72-
conn, err = next.Client(withEventFactoryCtx).Request(withEventFactoryCtx, request, opts...)
73-
if err != nil {
74-
if eventFactoryClient.state != established {
75-
eventFactoryClient.state = closed
76-
b.Delete(request.GetConnection().GetId())
77-
}
78-
return
7973
}
80-
eventFactoryClient.request = request.Clone()
81-
eventFactoryClient.request.Connection = conn.Clone()
82-
eventFactoryClient.opts = opts
83-
eventFactoryClient.state = established
74+
}
8475

85-
eventFactoryClient.returnedConnection = conn.Clone()
86-
eventFactoryClient.updateContext(ctx)
87-
})
8876
return conn, err
8977
}
9078

@@ -98,23 +86,9 @@ func (b *beginClient) Close(ctx context.Context, conn *networkservice.Connection
9886
// If we don't have a connection to Close, just let it be
9987
return
10088
}
101-
<-eventFactoryClient.executor.AsyncExec(func() {
102-
// If the connection is not established, don't do anything
103-
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.request == nil {
104-
return
105-
}
89+
err = <-eventFactoryClient.Close(
90+
withGRPCOpts(opts),
91+
)
10692

107-
// If this isn't the connection we started with, do nothing
108-
currentEventFactoryClient, _ := b.Load(conn.GetId())
109-
if currentEventFactoryClient != eventFactoryClient {
110-
return
111-
}
112-
// Always close with the last valid Connection we got
113-
conn = eventFactoryClient.request.Connection
114-
withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
115-
emp, err = next.Client(withEventFactoryCtx).Close(withEventFactoryCtx, conn, opts...)
116-
// afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories
117-
eventFactoryClient.afterCloseFunc()
118-
})
11993
return emp, err
12094
}

pkg/networkservice/common/begin/event_factory.go

Lines changed: 113 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
1+
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -29,6 +29,12 @@ import (
2929
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
3030
)
3131

32+
type errorEventFactoryInconsistency struct{}
33+
34+
func (e *errorEventFactoryInconsistency) Error() string {
35+
return "errorEventFactoryInconsistency error"
36+
}
37+
3238
type connectionState int
3339

3440
const (
@@ -52,19 +58,26 @@ type eventFactoryClient struct {
5258
ctxFunc func() (context.Context, context.CancelFunc)
5359
request *networkservice.NetworkServiceRequest
5460
returnedConnection *networkservice.Connection
55-
opts []grpc.CallOption
61+
grpcOpts []grpc.CallOption
5662
client networkservice.NetworkServiceClient
63+
beforeRequestFunc func() error
5764
afterCloseFunc func()
5865
}
5966

60-
func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.CallOption) *eventFactoryClient {
67+
func newEventFactoryClient(ctx context.Context, actualEventFactoryFunc func() *eventFactoryClient, afterClose func()) *eventFactoryClient {
6168
f := &eventFactoryClient{
6269
client: next.Client(ctx),
6370
initialCtxFunc: postpone.Context(ctx),
64-
opts: opts,
6571
}
6672
f.updateContext(ctx)
6773

74+
f.beforeRequestFunc = func() error {
75+
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
76+
return &errorEventFactoryInconsistency{}
77+
}
78+
return nil
79+
}
80+
6881
f.afterCloseFunc = func() {
6982
f.state = closed
7083
if afterClose != nil {
@@ -90,18 +103,29 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
90103
opt(o)
91104
}
92105
ch := make(chan error, 1)
106+
93107
f.executor.AsyncExec(func() {
94108
defer close(ch)
95-
if f.state != established {
109+
if err := f.beforeRequestFunc(); err != nil {
110+
ch <- err
96111
return
97112
}
98113
select {
99114
case <-o.cancelCtx.Done():
100115
default:
101116
request := f.request.Clone()
117+
grpcOpts := f.grpcOpts
118+
if o.userRequest != nil {
119+
request = o.userRequest
120+
request.Connection = mergeConnection(f.returnedConnection, o.userRequest.Connection, f.request.GetConnection())
121+
}
122+
if o.grpcOpts != nil {
123+
grpcOpts = o.grpcOpts
124+
}
125+
102126
if o.reselect {
103127
ctx, cancel := f.ctxFunc()
104-
_, _ = f.client.Close(ctx, request.GetConnection(), f.opts...)
128+
_, _ = f.client.Close(ctx, request.GetConnection(), grpcOpts...)
105129
if request.GetConnection() != nil {
106130
request.GetConnection().Mechanism = nil
107131
request.GetConnection().NetworkServiceEndpointName = ""
@@ -110,12 +134,32 @@ func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
110134
}
111135
cancel()
112136
}
113-
ctx, cancel := f.ctxFunc()
114-
defer cancel()
115-
conn, err := f.client.Request(ctx, request, f.opts...)
116-
if err == nil && f.request != nil {
117-
f.request.Connection = conn
137+
138+
var ctx context.Context
139+
if o.ctx != nil {
140+
ctx = withEventFactory(o.ctx, f)
141+
} else {
142+
var cancel context.CancelFunc
143+
ctx, cancel = f.ctxFunc()
144+
defer cancel()
145+
}
146+
147+
conn, err := f.client.Request(ctx, request, grpcOpts...)
148+
if err == nil {
149+
f.request = request.Clone()
150+
f.request.Connection = conn.Clone()
151+
f.grpcOpts = grpcOpts
152+
f.state = established
118153
f.request.Connection.State = networkservice.State_UP
154+
if o.connectionToReturn != nil {
155+
f.returnedConnection = conn.Clone()
156+
*o.connectionToReturn = conn.Clone()
157+
}
158+
f.updateContext(ctx)
159+
} else {
160+
if f.state != established {
161+
f.afterCloseFunc()
162+
}
119163
}
120164
ch <- err
121165
}
@@ -130,18 +174,26 @@ func (f *eventFactoryClient) Close(opts ...Option) <-chan error {
130174
for _, opt := range opts {
131175
opt(o)
132176
}
177+
133178
ch := make(chan error, 1)
134179
f.executor.AsyncExec(func() {
135180
defer close(ch)
136-
if f.request == nil {
181+
if f.request == nil || f.state != established {
137182
return
138183
}
184+
139185
select {
140186
case <-o.cancelCtx.Done():
141187
default:
188+
grpcOpts := f.grpcOpts
189+
if o.grpcOpts != nil {
190+
grpcOpts = o.grpcOpts
191+
}
192+
142193
ctx, cancel := f.ctxFunc()
143194
defer cancel()
144-
_, err := f.client.Close(ctx, f.request.GetConnection(), f.opts...)
195+
196+
_, err := f.client.Close(ctx, f.request.GetConnection(), grpcOpts...)
145197
f.afterCloseFunc()
146198
ch <- err
147199
}
@@ -158,17 +210,25 @@ type eventFactoryServer struct {
158210
ctxFunc func() (context.Context, context.CancelFunc)
159211
request *networkservice.NetworkServiceRequest
160212
returnedConnection *networkservice.Connection
161-
afterCloseFunc func()
162213
server networkservice.NetworkServiceServer
214+
beforeRequestFunc func() error
215+
afterCloseFunc func()
163216
}
164217

165-
func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
218+
func newEventFactoryServer(ctx context.Context, actualEventFactoryFunc func() *eventFactoryServer, afterClose func()) *eventFactoryServer {
166219
f := &eventFactoryServer{
167220
server: next.Server(ctx),
168221
initialCtxFunc: postpone.Context(ctx),
169222
}
170223
f.updateContext(ctx)
171224

225+
f.beforeRequestFunc = func() error {
226+
if actualEventFactoryFunc != nil && actualEventFactoryFunc() != f {
227+
return &errorEventFactoryInconsistency{}
228+
}
229+
return nil
230+
}
231+
172232
f.afterCloseFunc = func() {
173233
f.state = closed
174234
afterClose()
@@ -194,17 +254,48 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
194254
ch := make(chan error, 1)
195255
f.executor.AsyncExec(func() {
196256
defer close(ch)
197-
if f.state != established {
257+
if err := f.beforeRequestFunc(); err != nil {
258+
ch <- err
198259
return
199260
}
200261
select {
201262
case <-o.cancelCtx.Done():
202263
default:
203-
ctx, cancel := f.ctxFunc()
204-
defer cancel()
205-
conn, err := f.server.Request(ctx, f.request)
206-
if err == nil && f.request != nil {
207-
f.request.Connection = conn
264+
request := f.request.Clone()
265+
if o.userRequest != nil {
266+
request = o.userRequest
267+
}
268+
if f.state == established && request.GetConnection().GetState() == networkservice.State_RESELECT_REQUESTED {
269+
ctx, cancel := f.ctxFunc()
270+
_, _ = f.server.Close(ctx, f.request.GetConnection())
271+
f.state = closed
272+
cancel()
273+
}
274+
275+
var ctx context.Context
276+
if o.ctx != nil {
277+
ctx = withEventFactory(o.ctx, f)
278+
} else {
279+
var cancel context.CancelFunc
280+
ctx, cancel = f.ctxFunc()
281+
defer cancel()
282+
}
283+
284+
conn, err := f.server.Request(ctx, request)
285+
if err == nil {
286+
f.request = request.Clone()
287+
f.request.Connection = conn.Clone()
288+
f.state = established
289+
f.request.Connection.State = networkservice.State_UP
290+
if o.connectionToReturn != nil {
291+
f.returnedConnection = conn.Clone()
292+
*o.connectionToReturn = conn.Clone()
293+
}
294+
f.updateContext(ctx)
295+
} else {
296+
if f.state != established {
297+
f.afterCloseFunc()
298+
}
208299
}
209300
ch <- err
210301
}
@@ -222,7 +313,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
222313
ch := make(chan error, 1)
223314
f.executor.AsyncExec(func() {
224315
defer close(ch)
225-
if f.request == nil {
316+
if f.request == nil || f.state != established {
226317
return
227318
}
228319
select {

pkg/networkservice/common/begin/options.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 Cisco and/or its affiliates.
1+
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -18,11 +18,19 @@ package begin
1818

1919
import (
2020
"context"
21+
22+
"github.com/networkservicemesh/api/pkg/api/networkservice"
23+
"google.golang.org/grpc"
2124
)
2225

2326
type option struct {
2427
cancelCtx context.Context
2528
reselect bool
29+
30+
ctx context.Context
31+
userRequest *networkservice.NetworkServiceRequest
32+
grpcOpts []grpc.CallOption
33+
connectionToReturn **networkservice.Connection
2634
}
2735

2836
// Option - event option
@@ -41,3 +49,31 @@ func WithReselect() Option {
4149
o.reselect = true
4250
}
4351
}
52+
53+
// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
54+
func withContext(ctx context.Context) Option {
55+
return func(o *option) {
56+
o.ctx = ctx
57+
}
58+
}
59+
60+
// withUserRequest - optionally clear Mechanism and NetworkServiceName to force reselect
61+
func withUserRequest(r *networkservice.NetworkServiceRequest) Option {
62+
return func(o *option) {
63+
o.userRequest = r
64+
}
65+
}
66+
67+
// withOpts - optionally clear Mechanism and NetworkServiceName to force reselect
68+
func withGRPCOpts(opts []grpc.CallOption) Option {
69+
return func(o *option) {
70+
o.grpcOpts = opts
71+
}
72+
}
73+
74+
// withConnectionToReturn - optionally clear Mechanism and NetworkServiceName to force reselect
75+
func withConnectionToReturn(conn **networkservice.Connection) Option {
76+
return func(o *option) {
77+
o.connectionToReturn = conn
78+
}
79+
}

0 commit comments

Comments
 (0)