Skip to content

Commit 0ec8d16

Browse files
author
Kuankuan Guo
committed
feat: add worker idle task's init and register
- Allow application init and register new logics - The init function will only runs once
1 parent 2635ef6 commit 0ec8d16

File tree

7 files changed

+823
-4
lines changed

7 files changed

+823
-4
lines changed

src/bthread/bthread.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "bthread/timer_thread.h"
3131
#include "bthread/list_of_abafree_id.h"
3232
#include "bthread/bthread.h"
33+
#include "bthread/worker_idle.h"
3334

3435
namespace bthread {
3536
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
@@ -597,6 +598,17 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
597598
return 0;
598599
}
599600

601+
int bthread_register_worker_idle_function(int (*init_fn)(void),
602+
bool (*idle_fn)(void),
603+
uint64_t timeout_us,
604+
int* handle) {
605+
return bthread::register_worker_idle_function(init_fn, idle_fn, timeout_us, handle);
606+
}
607+
608+
int bthread_unregister_worker_idle_function(int handle) {
609+
return bthread::unregister_worker_idle_function(handle);
610+
}
611+
600612
int bthread_set_create_span_func(void* (*func)()) {
601613
if (func == NULL) {
602614
return EINVAL;

src/bthread/parking_lot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
6464

6565
// Wait for tasks.
6666
// If the `expected_state' does not match, wait() may finish directly.
67-
void wait(const State& expected_state) {
67+
void wait(const State& expected_state, const timespec* timeout = NULL) {
6868
if (get_state().val != expected_state.val) {
6969
// Fast path, no need to futex_wait.
7070
return;
7171
}
7272
if (_no_signal_when_no_waiter) {
7373
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
7474
}
75-
futex_wait_private(&_pending_signal, expected_state.val, NULL);
75+
futex_wait_private(&_pending_signal, expected_state.val, timeout);
7676
if (_no_signal_when_no_waiter) {
7777
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
7878
}

src/bthread/task_group.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "bthread/task_group.h"
3838
#include "bthread/timer_thread.h"
3939
#include "bthread/bthread.h"
40+
#include "bthread/worker_idle.h"
4041

4142
#ifdef __x86_64__
4243
#include <x86intrin.h>
@@ -167,7 +168,10 @@ bool TaskGroup::wait_task(bthread_t* tid) {
167168
if (_last_pl_state.stopped()) {
168169
return false;
169170
}
170-
_pl->wait(_last_pl_state);
171+
run_worker_idle_functions();
172+
const timespec timeout = get_worker_idle_timeout();
173+
const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0);
174+
_pl->wait(_last_pl_state, empty_time ? NULL : &timeout);
171175
if (steal_task(tid)) {
172176
return true;
173177
}
@@ -176,10 +180,13 @@ bool TaskGroup::wait_task(bthread_t* tid) {
176180
if (st.stopped()) {
177181
return false;
178182
}
183+
run_worker_idle_functions();
179184
if (steal_task(tid)) {
180185
return true;
181186
}
182-
_pl->wait(st);
187+
const timespec timeout = get_worker_idle_timeout();
188+
const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0);
189+
_pl->wait(st, empty_time ? NULL : &timeout);
183190
#endif
184191
} while (true);
185192
}

src/bthread/unstable.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,46 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
9292
// Add a startup function with tag
9393
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
9494

95+
// Registers a per-worker init function and an idle function.
96+
//
97+
// The init function is called at most once per worker thread, before the first
98+
// invocation of idle_fn in that worker.
99+
//
100+
// The idle function is called when a worker has no task to run.
101+
// The return value of idle_fn is ignored.
102+
// If no idle function is registered, the worker waits indefinitely. Otherwise
103+
// the worker waits for at most the minimal timeout among registered functions
104+
// before trying again.
105+
//
106+
// This function is thread-safe.
107+
//
108+
// Args:
109+
// init_fn: Optional. Called once per worker thread. Return 0 on success. A
110+
// non-zero return value disables idle_fn for that worker thread.
111+
// idle_fn: Required. Must not be NULL. Return true if any work is done.
112+
// timeout_us: Required. Must be > 0. Maximum waiting time when worker is idle.
113+
// handle: Optional output. On success, set to a positive handle for later
114+
// unregistration.
115+
//
116+
// Returns:
117+
// 0 on success, error code otherwise.
118+
extern int bthread_register_worker_idle_function(int (*init_fn)(void),
119+
bool (*idle_fn)(void),
120+
uint64_t timeout_us,
121+
int* handle);
122+
123+
// Unregisters an idle function by handle returned by
124+
// bthread_register_worker_idle_function().
125+
//
126+
// This function is thread-safe.
127+
//
128+
// Args:
129+
// handle: Handle returned by bthread_register_worker_idle_function().
130+
//
131+
// Returns:
132+
// 0 on success, error code otherwise.
133+
extern int bthread_unregister_worker_idle_function(int handle);
134+
95135
// Add a create span function
96136
extern int bthread_set_create_span_func(void* (*func)());
97137

src/bthread/worker_idle.cpp

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "bthread/worker_idle.h"
19+
20+
#include <errno.h>
21+
22+
#include <algorithm>
23+
#include <new>
24+
#include <vector>
25+
26+
#include "butil/atomicops.h"
27+
#include "butil/containers/doubly_buffered_data.h"
28+
#include "butil/time.h"
29+
#include "butil/thread_local.h"
30+
31+
namespace bthread {
32+
namespace {
33+
34+
enum InitState : uint8_t {
35+
INIT_STATE_NOT_RUN = 0,
36+
INIT_STATE_OK = 1,
37+
INIT_STATE_FAILED = 2,
38+
};
39+
40+
struct WorkerIdleEntry {
41+
int id;
42+
int (*init_fn)(void);
43+
bool (*idle_fn)(void);
44+
uint64_t timeout_us;
45+
};
46+
47+
typedef std::vector<WorkerIdleEntry> WorkerIdleEntryList;
48+
49+
static butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true> g_entries;
50+
static butil::atomic<int> g_next_id(1);
51+
52+
struct WorkerIdleTLS {
53+
std::vector<uint8_t> init_states;
54+
};
55+
56+
BAIDU_THREAD_LOCAL WorkerIdleTLS* tls_worker_idle = NULL;
57+
58+
static WorkerIdleTLS* get_or_create_tls() {
59+
if (tls_worker_idle) {
60+
return tls_worker_idle;
61+
}
62+
tls_worker_idle = new (std::nothrow) WorkerIdleTLS;
63+
return tls_worker_idle;
64+
}
65+
66+
} // namespace
67+
68+
int register_worker_idle_function(int (*init_fn)(void),
69+
bool (*idle_fn)(void),
70+
uint64_t timeout_us,
71+
int* handle) {
72+
if (idle_fn == NULL) {
73+
return EINVAL;
74+
}
75+
if (timeout_us == 0) {
76+
return EINVAL;
77+
}
78+
const int id = g_next_id.fetch_add(1, butil::memory_order_relaxed);
79+
WorkerIdleEntry e;
80+
e.id = id;
81+
e.init_fn = init_fn;
82+
e.idle_fn = idle_fn;
83+
e.timeout_us = timeout_us;
84+
g_entries.Modify([&](WorkerIdleEntryList& bg) {
85+
bg.push_back(e);
86+
return static_cast<size_t>(1);
87+
});
88+
if (handle) {
89+
*handle = id;
90+
}
91+
return 0;
92+
}
93+
94+
int unregister_worker_idle_function(int handle) {
95+
if (handle <= 0) {
96+
return EINVAL;
97+
}
98+
size_t removed = g_entries.Modify([&](WorkerIdleEntryList& bg) {
99+
const size_t old_size = bg.size();
100+
bg.erase(std::remove_if(bg.begin(), bg.end(),
101+
[&](const WorkerIdleEntry& e) {
102+
return e.id == handle;
103+
}),
104+
bg.end());
105+
return old_size - bg.size();
106+
});
107+
return removed ? 0 : EINVAL;
108+
}
109+
110+
bool has_worker_idle_functions() {
111+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
112+
if (g_entries.Read(&p) != 0) {
113+
return false;
114+
}
115+
return !p->empty();
116+
}
117+
118+
void run_worker_idle_functions() {
119+
if (!has_worker_idle_functions()) {
120+
return;
121+
}
122+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
123+
if (g_entries.Read(&p) != 0) {
124+
return;
125+
}
126+
if (p->empty()) {
127+
return;
128+
}
129+
130+
WorkerIdleTLS* tls = get_or_create_tls();
131+
if (tls == NULL) {
132+
return;
133+
}
134+
135+
// Step 1: Ensure per-worker init is called at most once for each entry.
136+
// Step 2: Run idle callbacks for initialized entries.
137+
// Step 3: Ignore callback return values. The caller decides how to proceed.
138+
for (const auto& e : *p) {
139+
if (e.id <= 0 || e.idle_fn == NULL) {
140+
continue;
141+
}
142+
if (tls->init_states.size() <= static_cast<size_t>(e.id)) {
143+
tls->init_states.resize(static_cast<size_t>(e.id) + 1, INIT_STATE_NOT_RUN);
144+
}
145+
uint8_t& st = tls->init_states[static_cast<size_t>(e.id)];
146+
if (st == INIT_STATE_NOT_RUN) {
147+
// Run the init callback function once.
148+
if (e.init_fn) {
149+
const int rc = e.init_fn();
150+
st = (rc == 0) ? INIT_STATE_OK : INIT_STATE_FAILED;
151+
} else {
152+
st = INIT_STATE_OK;
153+
}
154+
}
155+
if (st != INIT_STATE_OK) {
156+
continue;
157+
}
158+
// Run the idle callback function.
159+
e.idle_fn();
160+
}
161+
}
162+
163+
timespec get_worker_idle_timeout() {
164+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
165+
if (g_entries.Read(&p) != 0) {
166+
return {0, 0};
167+
}
168+
if (p->empty()) {
169+
return {0, 0};
170+
}
171+
uint64_t min_us = 0;
172+
for (const auto& e : *p) {
173+
if (e.timeout_us == 0) {
174+
continue;
175+
}
176+
if (min_us == 0 || e.timeout_us < min_us) {
177+
min_us = e.timeout_us;
178+
}
179+
}
180+
if (min_us == 0) {
181+
return {0, 0};
182+
}
183+
return butil::microseconds_to_timespec(min_us);
184+
}
185+
186+
} // namespace bthread
187+
188+

src/bthread/worker_idle.h

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#ifndef BTHREAD_WORKER_IDLE_H
19+
#define BTHREAD_WORKER_IDLE_H
20+
21+
#include <stdint.h>
22+
#include <time.h>
23+
24+
namespace bthread {
25+
26+
// Registers a per-worker init function and an idle function.
27+
//
28+
// The init function is called at most once per worker thread, before running
29+
// the idle function in that worker thread.
30+
//
31+
// Args:
32+
// init_fn: Optional. Can be NULL.
33+
// idle_fn: Required. Must not be NULL.
34+
// timeout_us: Required. Must be > 0.
35+
// handle: Optional output handle for unregistering later.
36+
//
37+
// Returns:
38+
// 0 on success, error code otherwise.
39+
int register_worker_idle_function(int (*init_fn)(void),
40+
bool (*idle_fn)(void),
41+
uint64_t timeout_us,
42+
int* handle);
43+
44+
// Unregisters a previously registered idle function by handle.
45+
//
46+
// Args:
47+
// handle: Handle returned by register_worker_idle_function().
48+
//
49+
// Returns:
50+
// 0 on success, error code otherwise.
51+
int unregister_worker_idle_function(int handle);
52+
53+
// Returns true if any idle function is registered.
54+
bool has_worker_idle_functions();
55+
56+
// Runs all registered idle functions for current worker thread.
57+
void run_worker_idle_functions();
58+
59+
// Get the minimal timeout among all registered functions.
60+
// Returns {0,0} if no idle function is registered.
61+
timespec get_worker_idle_timeout();
62+
63+
} // namespace bthread
64+
65+
#endif // BTHREAD_WORKER_IDLE_H
66+
67+

0 commit comments

Comments
 (0)