Skip to content

Commit e5e8f11

Browse files
authored
variants: redirect inner server sending methods to fix dropping notifications in stateless mode (#7)
Registers a sending middleware on each inner server that intercepts all outgoing messages and routes them through the front session using a session-swapped request wrapper. Replaces the previous proxy client handler approach which dropped log messages in stateless mode. ## Motivation and Context Tool handlers in variant servers use req.Session (the inner session) to send notifications. Previously, log messages were silently dropped in stateless mode because the inner session is not aware of the front session. This change injects the sendingMethodHandler from the frontSession into the context and adds a sending middleware to redirect all sending requests to the frontSession handler directly. A few considerations in this implementation: * Having frontSession in the context allows notifications to be sent to the correct frontSession, when the same backend session could be used by multiple front sessions. * This solution will work for synchronous notifications that happen within the same context (progress, log, sampling, elicitation) but not working for asynchronous notifications (tool list changed, resource updated etc), because asynchronous notifications are sent with a new context. * All the operations before the sending method will still happen in the backend. Most notable impact is in stateless mode, requests from one client could change the log level for other clients. For this reason, I default the logging level to debug for now. * This implementation relies on the MCP Go SDK sendingMethodHandler taking a generic `Request` not one that tied to specific function. Based on the current SDK implementation this is unlikely to change. * The same generic method forwarding mechanism could work for receiving methods too, but with some caveat. That can be addressed in separate PR. ## How Has This Been Tested? TestNotificationDelivery — verifies progress and log messages arrive at the front clients with multiple clients making concurrent request. New tests added for context preservation that was added in #6. Context preservation is required to make the notification forwarding work anyway. Since we use the context to carry frontSession handler to the backend session. All existing tests pass ## Breaking Changes None. Tool handlers continue to use req.Session normally. ## Types of changes Bug fix (non-breaking change which fixes an issue)
1 parent 150c110 commit e5e8f11

File tree

9 files changed

+790
-58
lines changed

9 files changed

+790
-58
lines changed

go/sdk/examples/server/research/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
A variant-aware MCP server that manages context budget by providing the same research tools (`search_papers`, `get_paper`, `summarize`) at different verbosity levels.
44

5-
**Pattern demonstrated:** Context budget management with description verbosity control.
5+
Each tool sends progress notifications and log messages as it works, simulating a deep-research workflow where the client can observe each step (e.g. "Searching arXiv", "Resolving references", "Generating summary").
6+
7+
**Patterns demonstrated:** Context budget management with description verbosity control, notification streaming.
68

79
## Variants
810

go/sdk/examples/server/research/main.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
// at different verbosity levels. Demonstrates how variants can optimize tool
33
// descriptions for different context window sizes and use cases.
44
//
5-
// Capability demonstrated: Context budget management, description verbosity.
5+
// Each tool sends progress notifications and log messages as it works,
6+
// simulating a deep-research workflow where the client can observe each
7+
// step (e.g. "Searching arXiv", "Resolving references").
8+
//
9+
// Capability demonstrated: Context budget management, description verbosity,
10+
// notification streaming.
611
//
712
// Variants:
813
// - deep-research: Verbose multi-paragraph descriptions with usage examples
@@ -31,7 +36,9 @@ import (
3136
func main() {
3237
// Deep-research variant: verbose descriptions with usage examples
3338
// for thorough research workflows with large context windows.
34-
deepServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, nil)
39+
deepServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, &mcp.ServerOptions{
40+
Capabilities: &mcp.ServerCapabilities{Logging: &mcp.LoggingCapabilities{}},
41+
})
3542
mcp.AddTool(deepServer, &mcp.Tool{
3643
Name: "search_papers",
3744
Description: "Search academic papers across multiple databases including arXiv, Semantic Scholar, and PubMed. " +
@@ -63,7 +70,9 @@ func main() {
6370

6471
// Quick-lookup variant: concise descriptions for fast Q&A sessions
6572
// where context budget is limited.
66-
quickServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, nil)
73+
quickServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, &mcp.ServerOptions{
74+
Capabilities: &mcp.ServerCapabilities{Logging: &mcp.LoggingCapabilities{}},
75+
})
6776
mcp.AddTool(quickServer, &mcp.Tool{
6877
Name: "search_papers",
6978
Description: "Search academic papers by query, with optional year and field filters.",
@@ -79,7 +88,9 @@ func main() {
7988

8089
// Synthesis variant: balanced descriptions for report generation
8190
// workflows that need moderate detail.
82-
synthesisServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, nil)
91+
synthesisServer := mcp.NewServer(&mcp.Implementation{Name: "research-assistant", Version: "v1.0.0"}, &mcp.ServerOptions{
92+
Capabilities: &mcp.ServerCapabilities{Logging: &mcp.LoggingCapabilities{}},
93+
})
8394
mcp.AddTool(synthesisServer, &mcp.Tool{
8495
Name: "search_papers",
8596
Description: "Search academic papers across arXiv, Semantic Scholar, and PubMed. " +

go/sdk/examples/server/research/tools.go

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package main
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/modelcontextprotocol/go-sdk/mcp"
79
)
@@ -27,7 +29,33 @@ type SearchPapersOutput struct {
2729
TotalFound int `json:"totalFound"`
2830
}
2931

30-
func searchPapers(_ context.Context, _ *mcp.CallToolRequest, in SearchPapersInput) (*mcp.CallToolResult, SearchPapersOutput, error) {
32+
var databases = []string{"arXiv", "Semantic Scholar", "PubMed"}
33+
34+
func searchPapers(ctx context.Context, req *mcp.CallToolRequest, in SearchPapersInput) (*mcp.CallToolResult, SearchPapersOutput, error) {
35+
token := req.Params.GetProgressToken()
36+
total := len(databases)
37+
38+
for i, db := range databases {
39+
req.Session.Log(ctx, &mcp.LoggingMessageParams{
40+
Level: "info",
41+
Logger: "search_papers",
42+
Data: fmt.Sprintf("Searching %s for %q", db, in.Query),
43+
})
44+
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
45+
ProgressToken: token,
46+
Progress: float64(i + 1),
47+
Total: float64(total),
48+
Message: fmt.Sprintf("Searching %s (%d/%d)", db, i+1, total),
49+
})
50+
time.Sleep(100 * time.Millisecond)
51+
}
52+
53+
req.Session.Log(ctx, &mcp.LoggingMessageParams{
54+
Level: "info",
55+
Logger: "search_papers",
56+
Data: fmt.Sprintf("Found 1247 results for %q, returning top 2", in.Query),
57+
})
58+
3159
return nil, SearchPapersOutput{
3260
Papers: []Paper{
3361
{
@@ -71,7 +99,25 @@ type GetPaperOutput struct {
7199
Paper PaperDetail `json:"paper"`
72100
}
73101

74-
func getPaper(_ context.Context, _ *mcp.CallToolRequest, in GetPaperInput) (*mcp.CallToolResult, GetPaperOutput, error) {
102+
func getPaper(ctx context.Context, req *mcp.CallToolRequest, in GetPaperInput) (*mcp.CallToolResult, GetPaperOutput, error) {
103+
token := req.Params.GetProgressToken()
104+
steps := []string{"Fetching metadata", "Resolving references", "Loading citations"}
105+
106+
for i, step := range steps {
107+
req.Session.Log(ctx, &mcp.LoggingMessageParams{
108+
Level: "info",
109+
Logger: "get_paper",
110+
Data: fmt.Sprintf("%s for %s", step, in.ID),
111+
})
112+
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
113+
ProgressToken: token,
114+
Progress: float64(i + 1),
115+
Total: float64(len(steps)),
116+
Message: fmt.Sprintf("%s (%d/%d)", step, i+1, len(steps)),
117+
})
118+
time.Sleep(80 * time.Millisecond)
119+
}
120+
75121
return nil, GetPaperOutput{
76122
Paper: PaperDetail{
77123
ID: in.ID,
@@ -98,11 +144,30 @@ type SummarizeOutput struct {
98144
Style string `json:"style"`
99145
}
100146

101-
func summarize(_ context.Context, _ *mcp.CallToolRequest, in SummarizeInput) (*mcp.CallToolResult, SummarizeOutput, error) {
147+
func summarize(ctx context.Context, req *mcp.CallToolRequest, in SummarizeInput) (*mcp.CallToolResult, SummarizeOutput, error) {
102148
style := in.Style
103149
if style == "" {
104150
style = "abstract"
105151
}
152+
153+
token := req.Params.GetProgressToken()
154+
steps := []string{"Analyzing text", "Generating summary"}
155+
156+
for i, step := range steps {
157+
req.Session.Log(ctx, &mcp.LoggingMessageParams{
158+
Level: "info",
159+
Logger: "summarize",
160+
Data: fmt.Sprintf("%s (%s style)", step, style),
161+
})
162+
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
163+
ProgressToken: token,
164+
Progress: float64(i + 1),
165+
Total: float64(len(steps)),
166+
Message: fmt.Sprintf("%s (%d/%d)", step, i+1, len(steps)),
167+
})
168+
time.Sleep(80 * time.Millisecond)
169+
}
170+
106171
return nil, SummarizeOutput{
107172
Summary: "This paper presents key findings in the field, demonstrating significant improvements over prior work through novel methodology and comprehensive evaluation across multiple benchmarks.",
108173
Style: style,

go/sdk/variants/backend.go

Lines changed: 90 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package variants
66

77
import (
88
"context"
9-
"fmt"
109

1110
"github.com/modelcontextprotocol/go-sdk/mcp"
1211
)
@@ -31,80 +30,118 @@ type backend interface {
3130
// inMemoryBackend connects to a co-located *mcp.Server via in-memory
3231
// transports.
3332
//
34-
// Limitation: list-changed and resource-updated notifications from
35-
// inner servers are silently dropped. The mcp.ServerSession API only
36-
// exposes NotifyProgress and Log; there is no public method to send
37-
// tool/resource/prompt list-changed or resource-updated notifications
38-
// to the front client. This means that if an inner server dynamically
39-
// adds or removes tools/resources/prompts at runtime, the front
40-
// client will NOT be notified.
33+
// The sending redirect middleware intercepts all outgoing messages from
34+
// the inner server and forwards them through the front session. This
35+
// covers notifications (progress, log) and server-to-client requests
36+
// (Elicit, CreateMessage, ListRoots) during request handling.
4137
//
38+
// Limitation: async notifications (e.g. tool/resource/prompt
39+
// list-changed) triggered outside request handling use
40+
// context.Background() and lack the front session in the context, so
41+
// the middleware falls through and these notifications are dropped.
4242
// In practice this is acceptable because inner servers are typically
43-
// statically configured (tools registered at startup). If dynamic
44-
// capability changes are needed, this will require the Go MCP SDK to
45-
// expose generic notification sending on ServerSession.
43+
// statically configured (tools registered at startup).
4644
type inMemoryBackend struct {
47-
server *mcp.Server
45+
variantID string
46+
server *mcp.Server
47+
mcpMethodHandler mcp.MethodHandler
4848
}
4949

50-
// captureMCPMethodHandler captures and returns a reference to the inner
50+
// sessionSwappedRequest wraps an existing mcp.Request but returns a
51+
// different session from GetSession(). The embedded mcp.Request satisfies
52+
// the unexported isRequest() method required by the interface.
53+
type sessionSwappedRequest struct {
54+
mcp.Request
55+
session mcp.Session
56+
}
57+
58+
func (r *sessionSwappedRequest) GetSession() mcp.Session { return r.session }
59+
60+
// sendingRedirectMiddleware returns a sending middleware that intercepts all
61+
// outgoing messages from the inner server (notifications and server-to-client
62+
// requests) and redirects them through the front server's sending handler with
63+
// the front session swapped in. By swapping the session we route messages
64+
// through the front connection to the real client.
65+
//
66+
// The front session (per-request) is read from context; the sending handler
67+
// (constant) is read from the Server struct.
68+
func sendingRedirectMiddleware(variantID string, vs *Server) mcp.Middleware {
69+
return func(next mcp.MethodHandler) mcp.MethodHandler {
70+
return func(ctx context.Context, method string, req mcp.Request) (mcp.Result, error) {
71+
frontSession, _ := ctx.Value(frontSessionKeyType{}).(*mcp.ServerSession)
72+
if frontSession == nil || vs.frontSendingHandler == nil {
73+
return next(ctx, method, req)
74+
}
75+
return vs.frontSendingHandler(ctx, method, &sessionSwappedRequest{
76+
Request: req,
77+
session: frontSession,
78+
})
79+
}
80+
}
81+
}
82+
83+
// newInMemoryBackend creates an inMemoryBackend, registers the sending
84+
// redirect middleware on the inner server, and captures the inner
85+
// server's handler chain for direct dispatch.
86+
func newInMemoryBackend(server *mcp.Server, variantID string, vs *Server) *inMemoryBackend {
87+
server.AddSendingMiddleware(sendingRedirectMiddleware(variantID, vs))
88+
return &inMemoryBackend{
89+
variantID: variantID,
90+
server: server,
91+
mcpMethodHandler: captureReceivingMethodHandler(server),
92+
}
93+
}
94+
95+
// captureReceivingMethodHandler captures and returns a reference to the inner
5196
// server's handler chain. This is a workaround using AddReceivingMiddleware
5297
// to gain a reference to mcp.Server.receivingMethodHandler_, since the SDK
5398
// does not expose a public accessor for it. This can be replaced once the
5499
// SDK exposes a public accessor for the receiving handler chain.
55-
func captureMCPMethodHandler(server *mcp.Server) (mcp.MethodHandler, error) {
100+
func captureReceivingMethodHandler(server *mcp.Server) mcp.MethodHandler {
56101
var handler mcp.MethodHandler
57-
58102
// The middleware is identity (returns next unmodified), so the handler
59103
// chain is unchanged, no extra hop introduced even if called multiple times.
60104
server.AddReceivingMiddleware(func(next mcp.MethodHandler) mcp.MethodHandler {
61105
handler = next
62106
return next
63107
})
64108

65-
if handler == nil {
66-
return nil, fmt.Errorf("failed to capture backend MCP method handler")
67-
}
68-
return handler, nil
109+
return handler
69110
}
70111

71112
// connect creates an in-memory transport pair and connects the inner server.
72-
// Requests bypass the transport via serverHandler to preserve context values.
73-
// The transport is kept alive only for notification forwarding (progress,
74-
// logging) from the inner server to the front client.
113+
// Requests bypass the transport via mcpMethodHandler to preserve context
114+
// values. Notifications are redirected by the sending middleware registered
115+
// in newInMemoryBackend; the proxy client is kept only for the initialize
116+
// handshake and to set the inner session's log level.
75117
func (b *inMemoryBackend) connect(ctx context.Context, variant ServerVariant, frontSession *mcp.ServerSession) (*innerConnection, error) {
76-
mcpMethodHandler, err := captureMCPMethodHandler(b.server)
77-
if err != nil {
78-
return nil, err
79-
}
80-
81118
serverTransport, clientSideTransport := mcp.NewInMemoryTransports()
82119

83120
serverSession, err := b.server.Connect(ctx, serverTransport, nil)
84121
if err != nil {
85122
return nil, err
86123
}
87124

125+
// The proxy client completes the initialize handshake and sets the
126+
// inner session's log level. It must remain open for the lifetime of
127+
// the serverSession — closing it tears down the in-memory transport,
128+
// causing the serverSession to shut down. The nop handlers advertise
129+
// capabilities so the inner ServerSession doesn't short-circuit
130+
// methods like Elicit with "client does not support X". They are
131+
// never called because the sending redirect middleware intercepts
132+
// all outgoing messages before they reach the transport.
133+
nopElicit := func(context.Context, *mcp.ElicitRequest) (*mcp.ElicitResult, error) {
134+
return nil, nil
135+
}
136+
nopSampling := func(context.Context, *mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
137+
return nil, nil
138+
}
88139
client := mcp.NewClient(&mcp.Implementation{
89140
Name: "variant-proxy-client",
90141
Version: "1.0.0",
91142
}, &mcp.ClientOptions{
92-
ProgressNotificationHandler: func(ctx context.Context, req *mcp.ProgressNotificationClientRequest) {
93-
if frontSession != nil {
94-
injectVariantMeta(req.Params, variant.ID)
95-
_ = frontSession.NotifyProgress(ctx, req.Params)
96-
}
97-
},
98-
LoggingMessageHandler: func(ctx context.Context, req *mcp.LoggingMessageRequest) {
99-
if frontSession != nil {
100-
injectVariantMeta(req.Params, variant.ID)
101-
_ = frontSession.Log(ctx, req.Params)
102-
}
103-
},
104-
ToolListChangedHandler: func(context.Context, *mcp.ToolListChangedRequest) {},
105-
ResourceListChangedHandler: func(context.Context, *mcp.ResourceListChangedRequest) {},
106-
PromptListChangedHandler: func(context.Context, *mcp.PromptListChangedRequest) {},
107-
ResourceUpdatedHandler: func(context.Context, *mcp.ResourceUpdatedNotificationRequest) {},
143+
ElicitationHandler: nopElicit,
144+
CreateMessageHandler: nopSampling,
108145
})
109146

110147
clientSession, err := client.Connect(ctx, clientSideTransport, nil)
@@ -113,11 +150,19 @@ func (b *inMemoryBackend) connect(ctx context.Context, variant ServerVariant, fr
113150
return nil, err
114151
}
115152

153+
// Set the inner session's log level to the lowest so that
154+
// ServerSession.Log() does not short-circuit before reaching the
155+
// sending middleware. The actual log-level filtering is performed by
156+
// the front-facing session when the middleware redirects the
157+
// notification. Errors are ignored: if the inner server does not
158+
// advertise the Logging capability the call simply fails harmlessly.
159+
_ = clientSession.SetLoggingLevel(ctx, &mcp.SetLoggingLevelParams{Level: "debug"})
160+
116161
return &innerConnection{
117162
backendSession: &backendSession{
118-
variantID: variant.ID,
163+
variantID: b.variantID,
119164
serverSession: serverSession,
120-
mcpMethodHandler: mcpMethodHandler,
165+
mcpMethodHandler: b.mcpMethodHandler,
121166
},
122167
cleanupFn: func() {
123168
clientSession.Close()

0 commit comments

Comments
 (0)