Skip to content

Commit 6f2eae0

Browse files
authored
Merge branch 'main' into test-coverage
2 parents 1ab79f2 + 3554b6f commit 6f2eae0

File tree

7 files changed

+360
-4
lines changed

7 files changed

+360
-4
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"dependencies": {
4444
"@libsql/client": "^0.14.0",
4545
"@outerbase/sdk": "2.0.0-rc.3",
46-
"form-data": "^4.0.1",
46+
"cron-parser": "^4.9.0",
4747
"hono": "^4.6.14",
4848
"jose": "^5.9.6",
4949
"mongodb": "^6.11.0",

plugins/cron/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
## Example Usage
2+
3+
Each task should have an entry in the `tmp_cron_tasks` table of your StarbaseDB instance. An example row might look like:
4+
5+
```json
6+
{
7+
"name": "Even minutes",
8+
"cron_tab": "*/2 * * * *",
9+
"payload": "",
10+
"callback_host": "https://starbasedb-{MY-IDENTIFIER}.workers.dev"
11+
}
12+
```
13+
14+
Then your code in your `/src/index.ts` would implement the plugin setup like below:
15+
16+
```ts
17+
import { CronPlugin } from '../plugins/cron'
18+
19+
// ....
20+
// ....
21+
22+
const cronPlugin = new CronPlugin()
23+
cronPlugin.onEvent(({ name, cron_tab, payload }) => {
24+
console.log('CRON EVENT: ', name, cron_tab, payload)
25+
26+
if (name === 'Even minutes') {
27+
console.log('Payload: ', JSON.stringify(payload))
28+
}
29+
}, ctx)
30+
31+
const plugins = [
32+
// ...
33+
cronPlugin,
34+
] satisfies StarbasePlugin[]
35+
```

plugins/cron/index.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
import { StarbaseApp } from '../../src/handler'
2+
import { StarbasePlugin } from '../../src/plugin'
3+
import { DataSource, QueryResult } from '../../src/types'
4+
import { createResponse } from '../../src/utils'
5+
import { getNextExecutionTime } from './utils'
6+
7+
const SQL_QUERIES = {
8+
CREATE_TABLE: `
9+
CREATE TABLE IF NOT EXISTS tmp_cron_tasks (
10+
name TEXT NOT NULL UNIQUE PRIMARY KEY,
11+
cron_tab TEXT NOT NULL,
12+
payload TEXT,
13+
callback_host TEXT,
14+
is_active INTEGER
15+
)
16+
`,
17+
INSERT_TASK: `
18+
INSERT OR REPLACE INTO tmp_cron_tasks (name, cron_tab, payload, callback_host)
19+
VALUES (?, ?, ?, ?)
20+
`,
21+
GET_TASKS: `
22+
SELECT name, cron_tab, payload
23+
FROM tmp_cron_tasks
24+
`,
25+
DELETE_TASK: `
26+
DELETE FROM tmp_cron_tasks WHERE name = ?
27+
`,
28+
// The below query allows up to 10 cron events set to `is_active`. At the moment
29+
// it is hard coded constrained but adding more WHEN clause rows will up that limit.
30+
UPDATE_ACTIVE_STATUS: `
31+
UPDATE tmp_cron_tasks
32+
SET is_active = CASE
33+
WHEN name = ? THEN 1
34+
WHEN name = ? THEN 1
35+
WHEN name = ? THEN 1
36+
WHEN name = ? THEN 1
37+
WHEN name = ? THEN 1
38+
WHEN name = ? THEN 1
39+
WHEN name = ? THEN 1
40+
WHEN name = ? THEN 1
41+
WHEN name = ? THEN 1
42+
WHEN name = ? THEN 1
43+
ELSE 0
44+
END
45+
`,
46+
}
47+
48+
export interface CronEventPayload {
49+
name: string
50+
cron_tab: string
51+
payload: Record<string, any>
52+
}
53+
54+
export class CronPlugin extends StarbasePlugin {
55+
public pathPrefix: string = '/cron'
56+
private dataSource?: DataSource
57+
private eventCallbacks: ((payload: CronEventPayload) => void)[] = []
58+
59+
constructor() {
60+
super('starbasedb:cron', {
61+
requiresAuth: true,
62+
})
63+
}
64+
65+
override async register(app: StarbaseApp) {
66+
app.use(async (c, next) => {
67+
this.dataSource = c?.get('dataSource')
68+
await this.init()
69+
await this.scheduleNextAlarm()
70+
await next()
71+
})
72+
73+
app.post(`${this.pathPrefix}/callback`, async (c) => {
74+
const payload = (await c.req.json()) as CronEventPayload[]
75+
76+
this.eventCallbacks.forEach((callback) => {
77+
try {
78+
payload.forEach((element) => {
79+
callback(element)
80+
})
81+
} catch (error) {
82+
console.error('Error in Cron event callback:', error)
83+
}
84+
})
85+
86+
return createResponse({ success: true }, undefined, 200)
87+
})
88+
}
89+
90+
private async init() {
91+
if (!this.dataSource) return
92+
93+
// Create cron tasks table if it doesn't exist
94+
await this.dataSource.rpc.executeQuery({
95+
sql: SQL_QUERIES.CREATE_TABLE,
96+
params: [],
97+
})
98+
}
99+
100+
private async scheduleNextAlarm() {
101+
if (!this.dataSource) return
102+
103+
// Get all tasks from our database table
104+
const result = (await this.dataSource.rpc.executeQuery({
105+
sql: SQL_QUERIES.GET_TASKS,
106+
params: [],
107+
})) as QueryResult[]
108+
109+
const tasks = result as {
110+
name: string
111+
cron_tab: string
112+
payload: string
113+
}[]
114+
115+
/**
116+
* No tasks exist. There are two options here we can proceed with:
117+
* 1. Delete the existing alarm in case they removed the task and no longer needed
118+
* 2. Leave any alarms and just return early (in case other plugins utilize Alarms)
119+
*
120+
* For the purpose of this plugin in its current state we are going to simply
121+
* return early so if other plugins utilize the Alarm we're not disrupting the
122+
* service(s) they provide. A side effect to this decision is that if you delete
123+
* a cron task from this plugin you may get one last lingering message sent that
124+
* is currently set to be triggered.
125+
*/
126+
if (tasks.length === 0) {
127+
// await this.dataSource.rpc.deleteAlarm() <-- We are intentionally _NOT_ calling this.
128+
return
129+
}
130+
131+
// Find the next execution time for each task
132+
const now = Date.now()
133+
let nextExecutionMs = Infinity
134+
let nextTasks: typeof tasks = []
135+
136+
for (const task of tasks) {
137+
const nextTime = getNextExecutionTime(task.cron_tab, now)
138+
139+
if (nextTime < nextExecutionMs && nextTime > now) {
140+
nextExecutionMs = nextTime
141+
nextTasks = [task]
142+
} else if (nextTime === nextExecutionMs && nextTime > now) {
143+
nextTasks.push(task)
144+
}
145+
}
146+
147+
if (nextTasks.length > 0) {
148+
// Update active status for all tasks
149+
// Fill remaining parameter slots with null if fewer than 10 tasks
150+
const taskNames = [
151+
...nextTasks.map((t) => t.name),
152+
null,
153+
null,
154+
null,
155+
null,
156+
null,
157+
null,
158+
null,
159+
null,
160+
null,
161+
null,
162+
].slice(0, 10)
163+
await this.dataSource.rpc.executeQuery({
164+
sql: SQL_QUERIES.UPDATE_ACTIVE_STATUS,
165+
params: taskNames,
166+
})
167+
168+
await this.dataSource.rpc.setAlarm(nextExecutionMs)
169+
}
170+
}
171+
172+
public async addEvent(
173+
cronTab: string,
174+
name: string,
175+
payload: Record<string, any> = {},
176+
callbackHost: string
177+
) {
178+
if (!this.dataSource)
179+
throw new Error('CronPlugin not properly initialized')
180+
181+
await this.dataSource.rpc.executeQuery({
182+
sql: SQL_QUERIES.INSERT_TASK,
183+
params: [name, cronTab, JSON.stringify(payload), callbackHost],
184+
})
185+
186+
// Reschedule alarms after adding new task
187+
await this.scheduleNextAlarm()
188+
}
189+
190+
public onEvent(
191+
callback: (payload: CronEventPayload) => void | Promise<void>,
192+
ctx?: ExecutionContext
193+
) {
194+
const wrappedCallback = async (payload: CronEventPayload) => {
195+
const result = callback(payload)
196+
if (result instanceof Promise && ctx) {
197+
ctx.waitUntil(result)
198+
}
199+
}
200+
201+
this.eventCallbacks.push(wrappedCallback)
202+
}
203+
}

plugins/cron/utils.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import * as cronParser from 'cron-parser'
2+
3+
export function parseCronExpression(cronTab: string) {
4+
return cronParser.parseExpression(cronTab)
5+
}
6+
7+
export function getNextExecutionTime(cronTab: string, after: number): number {
8+
const interval = cronParser.parseExpression(cronTab, {
9+
currentDate: new Date(after),
10+
})
11+
const next = interval.next()
12+
return next.getTime()
13+
}

pnpm-lock.yaml

Lines changed: 17 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)