Skip to content

Commit 429084e

Browse files
committed
feat: minimal async runtime on top of the NGINX event loop
This change introduces a general infrastructure for spawing async tasks on NGINX event loop. The only utility offered for now is a timer support via `ngx::async_::sleep`, with async IO and other scenarios being planned for future.
1 parent e017d3d commit 429084e

File tree

6 files changed

+267
-1
lines changed

6 files changed

+267
-1
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ rust-version.workspace = true
2626

2727
[dependencies]
2828
allocator-api2 = { version = "0.2.21", default-features = false }
29+
async-task = { version = "4.7.1", optional = true }
2930
lock_api = "0.4.13"
3031
nginx-sys = { path = "nginx-sys", default-features=false, version = "0.5.0"}
3132

3233
[features]
33-
default = ["vendored","std"]
34+
default = ["std", "vendored"]
35+
async = ["alloc", "dep:async-task"]
3436
# Enables the components using memory allocation.
3537
# If no `std` flag, `alloc` crate is internally used instead. This flag is mainly for `no_std` build.
3638
alloc = ["allocator-api2/alloc"]

src/async_/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//! Async runtime and set of utilities on top of the NGINX event loop.
2+
pub use self::sleep::sleep;
3+
pub use self::spawn::{spawn, Task};
4+
5+
mod sleep;
6+
mod spawn;

src/async_/sleep.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use core::future;
2+
use core::mem;
3+
use core::pin::{pin, Pin};
4+
use core::ptr;
5+
use core::task::{self, Poll};
6+
use core::time::Duration;
7+
8+
use nginx_sys::{ngx_add_timer, ngx_del_timer, ngx_event_t, ngx_msec_int_t, ngx_msec_t};
9+
10+
use crate::{ngx_container_of, ngx_log_debug};
11+
12+
/// Maximum duration that can be achieved using [ngx_add_timer].
13+
const NGX_TIMER_DURATION_MAX: Duration = Duration::from_millis(ngx_msec_int_t::MAX as _);
14+
15+
/// Puts the current task to sleep for at least the specified amount of time.
16+
#[cfg(not(target_pointer_width = "32"))]
17+
pub async fn sleep(duration: Duration) {
18+
let mut timer = pin!(Timer::new());
19+
ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}");
20+
21+
let msec = duration.min(NGX_TIMER_DURATION_MAX).as_millis() as ngx_msec_t;
22+
future::poll_fn(|cx| timer.as_mut().poll_sleep(msec, cx)).await
23+
}
24+
25+
/// Puts the current task to sleep for at least the specified amount of time.
26+
#[cfg(target_pointer_width = "32")]
27+
pub async fn sleep(mut duration: Duration) {
28+
let mut timer = pin!(Timer::new());
29+
ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}");
30+
31+
// Handle ngx_msec_t overflow on 32-bit platforms.
32+
while !duration.is_zero() {
33+
let msec = duration.min(NGX_TIMER_DURATION_MAX);
34+
duration = duration.saturating_sub(msec);
35+
36+
let msec = msec.as_millis() as ngx_msec_t;
37+
timer.event.set_timedout(0); // rearm
38+
future::poll_fn(|cx| timer.as_mut().poll_sleep(msec, cx)).await
39+
}
40+
}
41+
42+
struct Timer {
43+
event: ngx_event_t,
44+
waker: Option<task::Waker>,
45+
}
46+
47+
// SAFETY: Timer will only be used in a single-threaded environment
48+
unsafe impl Send for Timer {}
49+
unsafe impl Sync for Timer {}
50+
51+
impl Timer {
52+
pub fn new() -> Self {
53+
static IDENT: [usize; 4] = [
54+
0, 0, 0, 0x4153594e, // ASYN
55+
];
56+
57+
let mut ev: ngx_event_t = unsafe { mem::zeroed() };
58+
// The data is only used for `ngx_event_ident` and will not be mutated.
59+
ev.data = ptr::addr_of!(IDENT).cast_mut().cast();
60+
ev.handler = Some(Self::timer_handler);
61+
ev.log = unsafe { *nginx_sys::ngx_cycle }.log;
62+
ev.set_cancelable(1);
63+
64+
Self {
65+
event: ev,
66+
waker: None,
67+
}
68+
}
69+
70+
pub fn poll_sleep(
71+
mut self: Pin<&mut Self>,
72+
duration: ngx_msec_t,
73+
context: &mut task::Context<'_>,
74+
) -> Poll<()> {
75+
if self.event.timedout() != 0 {
76+
Poll::Ready(())
77+
} else if self.event.timer_set() != 0 {
78+
self.waker = Some(context.waker().clone());
79+
Poll::Pending
80+
} else {
81+
unsafe { ngx_add_timer(ptr::addr_of_mut!(self.event), duration) };
82+
self.waker = Some(context.waker().clone());
83+
Poll::Pending
84+
}
85+
}
86+
87+
unsafe extern "C" fn timer_handler(ev: *mut ngx_event_t) {
88+
let timer = ngx_container_of!(ev, Self, event);
89+
90+
if let Some(waker) = (*timer).waker.take() {
91+
waker.wake();
92+
}
93+
}
94+
}
95+
96+
impl Drop for Timer {
97+
fn drop(&mut self) {
98+
if self.event.timer_set() != 0 {
99+
unsafe { ngx_del_timer(ptr::addr_of_mut!(self.event)) };
100+
}
101+
}
102+
}

src/async_/spawn.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use core::cell::UnsafeCell;
2+
use core::future::Future;
3+
use core::mem;
4+
use core::ptr::{self, NonNull};
5+
6+
#[cfg(all(not(feature = "std"), feature = "alloc"))]
7+
use alloc::collections::vec_deque::VecDeque;
8+
#[cfg(feature = "std")]
9+
use std::collections::vec_deque::VecDeque;
10+
11+
pub use async_task::Task;
12+
use async_task::{Runnable, ScheduleInfo, WithInfo};
13+
use nginx_sys::{
14+
ngx_cycle, ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event,
15+
ngx_posted_next_events,
16+
};
17+
18+
use crate::{ngx_container_of, ngx_log_debug};
19+
20+
static SCHEDULER: Scheduler = Scheduler::new();
21+
22+
struct Scheduler(UnsafeCell<SchedulerInner>);
23+
24+
// SAFETY: Scheduler must only be used from the main thread of a worker process.
25+
unsafe impl Send for Scheduler {}
26+
unsafe impl Sync for Scheduler {}
27+
28+
impl Scheduler {
29+
const fn new() -> Self {
30+
Self(UnsafeCell::new(SchedulerInner::new()))
31+
}
32+
33+
pub fn schedule(&self, runnable: Runnable) {
34+
// SAFETY: the cell is not empty, and we have exclusive access due to being a
35+
// single-threaded application.
36+
let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
37+
inner.send(runnable)
38+
}
39+
}
40+
41+
#[repr(C)]
42+
struct SchedulerInner {
43+
_ident: [usize; 4], // `ngx_event_ident` compatibility
44+
event: ngx_event_t,
45+
queue: VecDeque<Runnable>,
46+
}
47+
48+
impl SchedulerInner {
49+
const fn new() -> Self {
50+
let mut event: ngx_event_t = unsafe { mem::zeroed() };
51+
event.handler = Some(Self::scheduler_event_handler);
52+
53+
Self {
54+
_ident: [
55+
0, 0, 0, 0x4153594e, // ASYN
56+
],
57+
event,
58+
queue: VecDeque::new(),
59+
}
60+
}
61+
62+
pub fn send(&mut self, runnable: Runnable) {
63+
// Cached `ngx_cycle.log` can be invalidated when reloading configuration in a single
64+
// process mode. Update `log` every time to avoid using stale log pointer.
65+
self.event.log = unsafe { (*ngx_cycle).log };
66+
67+
// While this event is not used as a timer at the moment, we still want to ensure that it is
68+
// compatible with `ngx_event_ident`.
69+
if self.event.data.is_null() {
70+
self.event.data = ptr::from_mut(self).cast();
71+
}
72+
73+
// TODO: handle allocation failures
74+
self.queue.push_back(runnable);
75+
unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
76+
}
77+
78+
/// This event handler is called by ngx_event_process_posted at the end of
79+
/// ngx_process_events_and_timers.
80+
extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
81+
let mut runnables = {
82+
// SAFETY:
83+
// This handler always receives a non-null pointer to an event embedded into a
84+
// SchedulerInner instance.
85+
// We modify the contents of `UnsafeCell`, but we ensured that the access is unique due
86+
// to being single-threaded and dropping the reference before we start processing queued
87+
// runnables.
88+
let this =
89+
unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).as_mut() };
90+
91+
ngx_log_debug!(
92+
this.event.log,
93+
"async: processing {} deferred wakeups",
94+
this.queue.len()
95+
);
96+
97+
// Move runnables to a new queue to avoid borrowing from the SchedulerInner and limit
98+
// processing to already queued wakeups. This ensures that we correctly handle tasks
99+
// that keep scheduling themselves (e.g. using yield_now() in a loop).
100+
// We can't use drain() as it borrows from self and breaks aliasing rules.
101+
mem::take(&mut this.queue)
102+
};
103+
104+
for runnable in runnables.drain(..) {
105+
runnable.run();
106+
}
107+
}
108+
}
109+
110+
impl Drop for SchedulerInner {
111+
fn drop(&mut self) {
112+
if self.event.posted() != 0 {
113+
unsafe { ngx_delete_posted_event(&mut self.event) };
114+
}
115+
116+
if self.event.timer_set() != 0 {
117+
unsafe { ngx_del_timer(&mut self.event) };
118+
}
119+
}
120+
}
121+
122+
fn schedule(runnable: Runnable, info: ScheduleInfo) {
123+
if info.woken_while_running {
124+
SCHEDULER.schedule(runnable);
125+
ngx_log_debug!(
126+
unsafe { (*ngx_cycle).log },
127+
"async: task scheduled while running"
128+
);
129+
} else {
130+
runnable.run();
131+
}
132+
}
133+
134+
/// Creates a new task running on the NGINX event loop.
135+
pub fn spawn<F, T>(future: F) -> Task<T>
136+
where
137+
F: Future<Output = T> + 'static,
138+
T: 'static,
139+
{
140+
ngx_log_debug!(unsafe { (*ngx_cycle).log }, "async: spawned new task");
141+
let scheduler = WithInfo(schedule);
142+
// Safety: single threaded embedding takes care of send/sync requirements for future and
143+
// scheduler. Future and scheduler are both 'static.
144+
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
145+
runnable.schedule();
146+
task
147+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ extern crate alloc;
4141
extern crate std;
4242

4343
pub mod allocator;
44+
#[cfg(feature = "async")]
45+
pub mod async_;
4446

4547
/// The core module.
4648
///

0 commit comments

Comments
 (0)