@@ -200,7 +200,7 @@ class BufferFinalizer : private Finalizer {
200200 ~BufferFinalizer() { env()->Unref(); }
201201};
202202
203- class ThreadSafeFunction : public node::AsyncResource {
203+ class ThreadSafeFunction {
204204 public:
205205 ThreadSafeFunction(v8::Local<v8::Function> func,
206206 v8::Local<v8::Object> resource,
@@ -212,11 +212,12 @@ class ThreadSafeFunction : public node::AsyncResource {
212212 void* finalize_data_,
213213 napi_finalize finalize_cb_,
214214 napi_threadsafe_function_call_js call_js_cb_)
215- : AsyncResource(env_->isolate,
216- resource,
217- node::Utf8Value(env_->isolate, name).ToStringView()),
215+ : async_resource(std::in_place,
216+ env_->isolate,
217+ resource,
218+ node::Utf8Value(env_->isolate, name).ToStringView()),
218219 thread_count(thread_count_),
219- is_closing(false ),
220+ state(kOpen ),
220221 dispatch_state(kDispatchIdle),
221222 context(context_),
222223 max_queue_size(max_queue_size_),
@@ -230,76 +231,104 @@ class ThreadSafeFunction : public node::AsyncResource {
230231 env->Ref();
231232 }
232233
233- ~ThreadSafeFunction() override {
234- node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
235- env->Unref();
236- }
234+ ~ThreadSafeFunction() { ReleaseResources(); }
237235
238236 // These methods can be called from any thread.
239237
240238 napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
241- node::Mutex::ScopedLock lock(this->mutex);
239+ {
240+ node::Mutex::ScopedLock lock(this->mutex);
242241
243- while (queue.size() >= max_queue_size && max_queue_size > 0 &&
244- !is_closing) {
245- if (mode == napi_tsfn_nonblocking) {
246- return napi_queue_full;
242+ while (queue.size() >= max_queue_size && max_queue_size > 0 &&
243+ state == kOpen) {
244+ if (mode == napi_tsfn_nonblocking) {
245+ return napi_queue_full;
246+ }
247+ cond->Wait(lock);
247248 }
248- cond->Wait(lock);
249- }
250249
251- if (is_closing) {
250+ if (state == kOpen) {
251+ queue.push(data);
252+ Send();
253+ return napi_ok;
254+ }
252255 if (thread_count == 0) {
253256 return napi_invalid_arg;
254- } else {
255- thread_count--;
257+ }
258+ thread_count--;
259+ if (!(state == kClosed && thread_count == 0)) {
256260 return napi_closing;
257261 }
258- } else {
259- queue.push(data);
260- Send();
261- return napi_ok;
262262 }
263+ // Make sure to release lock before destroying
264+ delete this;
265+ return napi_closing;
263266 }
264267
265268 napi_status Acquire() {
266269 node::Mutex::ScopedLock lock(this->mutex);
267270
268- if (is_closing) {
269- return napi_closing;
270- }
271+ if (state == kOpen) {
272+ thread_count++;
271273
272- thread_count++;
274+ return napi_ok;
275+ }
273276
274- return napi_ok ;
277+ return napi_closing ;
275278 }
276279
277280 napi_status Release(napi_threadsafe_function_release_mode mode) {
278- node::Mutex::ScopedLock lock(this->mutex);
281+ {
282+ node::Mutex::ScopedLock lock(this->mutex);
279283
280- if (thread_count == 0) {
281- return napi_invalid_arg;
282- }
284+ if (thread_count == 0) {
285+ return napi_invalid_arg;
286+ }
283287
284- thread_count--;
288+ thread_count--;
285289
286- if (thread_count == 0 || mode == napi_tsfn_abort) {
287- if (!is_closing) {
288- is_closing = (mode == napi_tsfn_abort);
289- if (is_closing && max_queue_size > 0) {
290- cond->Signal(lock);
290+ if (thread_count == 0 || mode == napi_tsfn_abort) {
291+ if (state == kOpen) {
292+ if (mode == napi_tsfn_abort) {
293+ state = kClosing;
294+ }
295+ if (state == kClosing && max_queue_size > 0) {
296+ cond->Signal(lock);
297+ }
298+ Send();
291299 }
292- Send();
293300 }
294- }
295301
302+ if (!(state == kClosed && thread_count == 0)) {
303+ return napi_ok;
304+ }
305+ }
306+ // Make sure to release lock before destroying
307+ delete this;
296308 return napi_ok;
297309 }
298310
299- void EmptyQueueAndDelete() {
300- for (; !queue.empty(); queue.pop()) {
301- call_js_cb(nullptr, nullptr, context, queue.front());
311+ void EmptyQueueAndMaybeDelete() {
312+ std::queue<void*> drain_queue;
313+ {
314+ node::Mutex::ScopedLock lock(this->mutex);
315+ queue.swap(drain_queue);
302316 }
317+ for (; !drain_queue.empty(); drain_queue.pop()) {
318+ call_js_cb(nullptr, nullptr, context, drain_queue.front());
319+ }
320+ {
321+ node::Mutex::ScopedLock lock(this->mutex);
322+ if (thread_count > 0) {
323+ // At this point this TSFN is effectively done, but we need to keep
324+ // it alive for other threads that still have pointers to it until
325+ // they release them.
326+ // But we already release all the resources that we can at this point
327+ ReleaseResources();
328+ return;
329+ }
330+ }
331+ // Make sure to release lock before destroying
303332 delete this;
304333 }
305334
@@ -351,6 +380,16 @@ class ThreadSafeFunction : public node::AsyncResource {
351380 inline void* Context() { return context; }
352381
353382 protected:
383+ void ReleaseResources() {
384+ if (state != kClosed) {
385+ state = kClosed;
386+ ref.Reset();
387+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
388+ env->Unref();
389+ async_resource.reset();
390+ }
391+ }
392+
354393 void Dispatch() {
355394 bool has_more = true;
356395
@@ -379,9 +418,7 @@ class ThreadSafeFunction : public node::AsyncResource {
379418
380419 {
381420 node::Mutex::ScopedLock lock(this->mutex);
382- if (is_closing) {
383- CloseHandlesAndMaybeDelete();
384- } else {
421+ if (state == kOpen) {
385422 size_t size = queue.size();
386423 if (size > 0) {
387424 data = queue.front();
@@ -395,7 +432,7 @@ class ThreadSafeFunction : public node::AsyncResource {
395432
396433 if (size == 0) {
397434 if (thread_count == 0) {
398- is_closing = true ;
435+ state = kClosing ;
399436 if (max_queue_size > 0) {
400437 cond->Signal(lock);
401438 }
@@ -404,12 +441,14 @@ class ThreadSafeFunction : public node::AsyncResource {
404441 } else {
405442 has_more = true;
406443 }
444+ } else {
445+ CloseHandlesAndMaybeDelete();
407446 }
408447 }
409448
410449 if (popped_value) {
411450 v8::HandleScope scope(env->isolate);
412- CallbackScope cb_scope(this );
451+ AsyncResource:: CallbackScope cb_scope(&*async_resource );
413452 napi_value js_callback = nullptr;
414453 if (!ref.IsEmpty()) {
415454 v8::Local<v8::Function> js_cb =
@@ -426,17 +465,17 @@ class ThreadSafeFunction : public node::AsyncResource {
426465 void Finalize() {
427466 v8::HandleScope scope(env->isolate);
428467 if (finalize_cb) {
429- CallbackScope cb_scope(this );
468+ AsyncResource:: CallbackScope cb_scope(&*async_resource );
430469 env->CallFinalizer<false>(finalize_cb, finalize_data, context);
431470 }
432- EmptyQueueAndDelete ();
471+ EmptyQueueAndMaybeDelete ();
433472 }
434473
435474 void CloseHandlesAndMaybeDelete(bool set_closing = false) {
436475 v8::HandleScope scope(env->isolate);
437476 if (set_closing) {
438477 node::Mutex::ScopedLock lock(this->mutex);
439- is_closing = true ;
478+ state = kClosing ;
440479 if (max_queue_size > 0) {
441480 cond->Signal(lock);
442481 }
@@ -501,19 +540,30 @@ class ThreadSafeFunction : public node::AsyncResource {
501540 }
502541
503542 private:
543+ // Needed because node::AsyncResource::CallbackScope is protected
544+ class AsyncResource : public node::AsyncResource {
545+ public:
546+ using node::AsyncResource::AsyncResource;
547+ using node::AsyncResource::CallbackScope;
548+ };
549+
550+ enum State : unsigned char { kOpen, kClosing, kClosed };
551+
504552 static const unsigned char kDispatchIdle = 0;
505553 static const unsigned char kDispatchRunning = 1 << 0;
506554 static const unsigned char kDispatchPending = 1 << 1;
507555
508556 static const unsigned int kMaxIterationCount = 1000;
509557
558+ std::optional<AsyncResource> async_resource;
559+
510560 // These are variables protected by the mutex.
511561 node::Mutex mutex;
512562 std::unique_ptr<node::ConditionVariable> cond;
513563 std::queue<void*> queue;
514564 uv_async_t async;
515565 size_t thread_count;
516- bool is_closing ;
566+ State state ;
517567 std::atomic_uchar dispatch_state;
518568
519569 // These are variables set once, upon creation, and then never again, which
0 commit comments