Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/data/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
type MutationEvent,
type OpenEvent,
type ReconnectEvent,
type ResetEvent,
type WelcomeBackEvent,
type WelcomeEvent,
} from '../types'
import defaults from '../util/defaults'
Expand Down Expand Up @@ -59,9 +61,13 @@ export type MapListenEventNamesToListenEvents<
? MutationEvent<R>
: E extends 'reconnect'
? ReconnectEvent
: E extends 'open'
? OpenEvent
: never
: E extends 'welcomeback'
? WelcomeBackEvent
: E extends 'reset'
? ResetEvent
: E extends 'open'
? OpenEvent
: never
: never

/**
Expand All @@ -81,7 +87,7 @@ export type ListenEventFromOptions<
? MapListenEventNamesToListenEvents<R, Opts['events']>
: // fall back to ListenEvent if opts events is present, but we can't infer the literal event names
ListenEvent<R>
: MutationEvent<R>
: MutationEvent<R> | ResetEvent

/**
* Set up a listener that will be notified when mutations occur on documents matching the provided query/filter.
Expand All @@ -95,7 +101,7 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
this: SanityClient | ObservableSanityClient,
query: string,
params?: ListenParams,
): Observable<MutationEvent<R>>
): Observable<MutationEvent<R> | ResetEvent>
/**
* Set up a listener that will be notified when mutations occur on documents matching the provided query/filter.
*
Expand Down Expand Up @@ -127,14 +133,16 @@ export function _listen<
const tag = opts.tag && requestTagPrefix ? [requestTagPrefix, opts.tag].join('.') : opts.tag
const options = {...defaults(opts, defaultOptions), tag}
const listenOpts = pick(options, possibleOptions)
const qs = encodeQueryString({query, params, options: {tag, ...listenOpts}})
const qs = encodeQueryString({query, params, options: {tag, ...listenOpts, enableResume: true}})

const uri = `${url}${_getDataUrl(this, 'listen', qs)}`
if (uri.length > MAX_URL_LENGTH) {
return throwError(() => new Error('Query too large for listener'))
}

const listenFor = (options.events ? options.events : ['mutation']) satisfies Opts['events']
const listenFor = (
options.events ? options.events : ['mutation', 'reset']
) satisfies Opts['events']

const esOptions: EventSourceInit & {headers?: Record<string, string>} = {}
if (withCredentials) {
Expand Down
35 changes: 30 additions & 5 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,8 @@ export type OpenEvent = {

/**
* The listener has been established, and will start receiving events.
* Note that this is also emitted upon _reconnection_.
* Before apiVersion vTBD this is also emitted when reconnected
* As of apiVersion vTBD this is no longer emitted on reconnect, instead the `welcomeback` event is emitted
*
* @public
*/
Expand All @@ -1161,10 +1162,31 @@ export type WelcomeEvent = {
listenerName: string
}

/**
* The listener has reconnected and successfully resumed from where it left off
*
* @public
*/
export type WelcomeBackEvent = {
type: 'welcomeback'
listenerName: string
}

/**
* The listener can't be resumed or otherwise need to reset its local state
*
* @public
*/
export type ResetEvent = {
type: 'reset'
}

/** @public */
export type ListenEvent<R extends Record<string, Any>> =
export type ListenEvent<R extends Record<string, Any> = Record<string, Any>> =
| MutationEvent<R>
| ReconnectEvent
| WelcomeBackEvent
| ResetEvent
| WelcomeEvent
| OpenEvent

Expand All @@ -1176,6 +1198,10 @@ export type ListenEventName =
| 'welcome'
/** The listener has been disconnected, and a reconnect attempt is scheduled */
| 'reconnect'
/** The listener has reconnected and successfully resumed from where it left off */
| 'welcomeback'
/** The listener can't be resumed or otherwise need to reset its local state */
| 'reset'
/**
* The listener connection has been established
* note: it's usually a better option to use the 'welcome' event
Expand Down Expand Up @@ -1232,9 +1258,8 @@ export interface ListenOptions {
visibility?: 'transaction' | 'query'

/**
* Array of event names to include in the observable. By default, only mutation events are included.
*
* @defaultValue `['mutation']`
* Array of event names to include in the observable. By default, only mutation and reset events are included.
* @defaultValue `['mutation', 'reset']`
*/
events?: ListenEventName[]

Expand Down
18 changes: 10 additions & 8 deletions test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,9 @@ describe('client', async () => {
].join('\n')

nock(`https://${apiHost}`)
.get('/v1/media-libraries/res-id/listen?query=foo.bar&includeResult=true')
.get(
'/v1/media-libraries/res-id/listen?query=foo.bar&includeResult=true&enableResume=true',
)
.reply(200, response, {
'cache-control': 'no-cache',
'content-type': 'text/event-stream; charset=utf-8',
Expand All @@ -665,7 +667,7 @@ describe('client', async () => {
'foo.bar',
),
)
expect(evt.result).toEqual(doc)
expect(evt.type === 'mutation' && evt.result).toEqual(doc)
},
)
})
Expand Down Expand Up @@ -3665,15 +3667,15 @@ describe('client', async () => {
].join('\n')

nock(projectHost())
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true')
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true&enableResume=true')
.reply(200, response, {
'cache-control': 'no-cache',
'content-type': 'text/event-stream; charset=utf-8',
'transfer-encoding': 'chunked',
})

const evt = await firstValueFrom(getClient().listen('foo.bar'))
expect(evt.result).toEqual(doc)
expect(evt.type === 'mutation' && evt.result).toEqual(doc)
})

test('listeners connect to listen endpoint with request tag, emits events', async () => {
Expand All @@ -3695,7 +3697,7 @@ describe('client', async () => {

nock(projectHost())
.get(
'/v1/data/listen/foo?tag=sfcraft.checkins&query=*%5B_type%20%3D%3D%20%22checkin%22%5D&includeResult=true',
'/v1/data/listen/foo?tag=sfcraft.checkins&query=*%5B_type%20%3D%3D%20%22checkin%22%5D&includeResult=true&enableResume=true',
)
.reply(200, response, {
'cache-control': 'no-cache',
Expand Down Expand Up @@ -3728,7 +3730,7 @@ describe('client', async () => {

nock(projectHost())
.get(
'/v1/data/listen/foo?tag=sf.craft.checkins&query=*%5B_type%20%3D%3D%20%22checkin%22%5D&includeResult=true',
'/v1/data/listen/foo?tag=sf.craft.checkins&query=*%5B_type%20%3D%3D%20%22checkin%22%5D&includeResult=true&enableResume=true',
)
.reply(200, response, {
'cache-control': 'no-cache',
Expand Down Expand Up @@ -3762,7 +3764,7 @@ describe('client', async () => {

let didRequest = false
nock(projectHost())
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true')
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true&enableResume=true')
.reply(() => {
didRequest = true
return [200, response]
Expand All @@ -3789,7 +3791,7 @@ describe('client', async () => {

let requestCount = 0
nock(projectHost())
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true')
.get('/v1/data/listen/foo?query=foo.bar&includeResult=true&enableResume=true')
.twice()
.reply(() => {
requestCount++
Expand Down
36 changes: 34 additions & 2 deletions test/listen.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {
type MutationEvent,
type OpenEvent,
type ReconnectEvent,
type ResetEvent,
type WelcomeBackEvent,
type WelcomeEvent,
} from '@sanity/client'
import type {Observable} from 'rxjs'
Expand All @@ -12,8 +14,8 @@ import {describe, expectTypeOf, test} from 'vitest'
describe('client.listen', () => {
const client = createClient({})
test('event types', async () => {
// mutation event is default
expectTypeOf(client.listen('*')).toEqualTypeOf<Observable<MutationEvent>>()
// mutation and reset is default
expectTypeOf(client.listen('*')).toEqualTypeOf<Observable<MutationEvent | ResetEvent>>()

// @ts-expect-error - WelcomeEvent should not be emitted
expectTypeOf(client.listen('*')).toEqualTypeOf<Observable<WelcomeEvent>>()
Expand All @@ -30,8 +32,38 @@ describe('client.listen', () => {
Observable<ListenEvent<MyDoc>>
>()

expectTypeOf(
client.listen<MyDoc>(
'*[_type=="match" && !completed]',
{},
{
events: ['mutation', 'welcome', 'reconnect', 'welcomeback'],
includeResult: true,
},
),
).toEqualTypeOf<Observable<ListenEvent<MyDoc>>>()

expectTypeOf(client.listen('*', {}, {events: []})).toEqualTypeOf<Observable<never>>()

expectTypeOf(client.listen('*', {}, {events: ['mutation', 'reset']})).toEqualTypeOf<
Observable<MutationEvent | ResetEvent>
>()

expectTypeOf(
client.listen(
'*',
{},
{enableResume: true, events: ['welcome', 'mutation', 'welcomeback', 'reset']},
),
).toEqualTypeOf<Observable<WelcomeEvent | MutationEvent | WelcomeBackEvent | ResetEvent>>()

const observable = client.listen(
'*',
{},
{enableResume: true, events: ['welcomeback', 'reset']},
)
expectTypeOf(observable).toEqualTypeOf<Observable<WelcomeBackEvent | ResetEvent>>()

expectTypeOf(client.listen('*', {}, {events: ['welcome']})).toEqualTypeOf<
// @ts-expect-error - Only WelcomeEvents should be emitted
Observable<MutationEvent>
Expand Down
90 changes: 90 additions & 0 deletions test/listen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ describe.skipIf(typeof EdgeRuntime === 'string' || typeof document !== 'undefine
query: '*[_type == "beer" && title == $beerName]',
$beerName: '"Headroom Double IPA"',
includeResult: 'true',
enableResume: 'true',
})
expect(request.url, 'url should be correct').toEqual(`/v1/data/listen/prod?${searchParams}`)

Expand Down Expand Up @@ -159,6 +160,95 @@ describe.skipIf(typeof EdgeRuntime === 'string' || typeof document !== 'undefine
server.close()
})

test('forwards welcome and welcomeback events if opted for', async () => {
expect.assertions(2)

let attempt = 0
const {server, client} = await testSse(({channel, request}) => {
attempt++
if (attempt === 1) {
channel!.send({event: 'welcome', data: {listenerName: 'foo1'}})
channel!.send({event: 'mutation', id: '123', data: {foo: 'bar'}})
channel!.close()
}
if (attempt === 2) {
expect(request.headers['last-event-id'], 'should send last-event-id').toBe('123')
channel!.send({event: 'welcomeback', data: {listenerName: 'foo2'}})
channel!.send({event: 'mutation', id: '345', data: {bar: 'baz'}})
process.nextTick(() => channel!.close())
}
})

const events = await lastValueFrom(
client
.listen(
'*',
{},
{enableResume: true, events: ['reconnect', 'mutation', 'welcome', 'welcomeback']},
)
.pipe(
take(5),
catchError((err) => of(err)),
toArray(),
),
)
expect(events).toEqual([
{type: 'welcome', listenerName: 'foo1'},
{type: 'mutation', foo: 'bar'},
{type: 'reconnect'},
{type: 'welcomeback', listenerName: 'foo2'},
{type: 'mutation', bar: 'baz'},
])

server.close()
})

test('forwards reset events if opted for', async () => {
expect.assertions(2)

let attempt = 0
const {server, client} = await testSse(({channel, request}) => {
attempt++
if (attempt === 1) {
channel!.send({event: 'welcome', data: {listenerName: 'foo1'}})
channel!.send({event: 'mutation', id: '123', data: {foo: 'bar'}})
channel!.close()
}
if (attempt === 2) {
expect(request.headers['last-event-id'], 'should send last-event-id').toBe('123')
channel!.send({event: 'reset'})
channel!.send({event: 'mutation', id: '345', data: {bar: 'baz'}})
process.nextTick(() => channel!.close())
}
})

const events = await lastValueFrom(
client
.listen(
'*',
{},
{
enableResume: true,
events: ['reconnect', 'mutation', 'welcome', 'welcomeback', 'reset'],
},
)
.pipe(
take(5),
catchError((err) => of(err)),
toArray(),
),
)
expect(events).toEqual([
{type: 'welcome', listenerName: 'foo1'},
{type: 'mutation', foo: 'bar'},
{type: 'reconnect'},
{type: 'reset'},
{type: 'mutation', bar: 'baz'},
])

server.close()
})

test('emits channel errors', async () => {
expect.assertions(1)

Expand Down