From 4ae4ce176ca6cb4528c7ee22f53dd1b617efa581 Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Sun, 5 Mar 2023 20:13:28 -0600 Subject: [PATCH] fix(api): attempt to recycle leaking workers when a job finishes --- api/onnx_web/worker/pool.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 96550a8d..7df861c3 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -171,6 +171,7 @@ class DevicePoolExecutor: _device, progress = self.active_jobs[job] self.finished_jobs.append((job, progress, context.cancel.value)) del self.active_jobs[job] + self.join_leaking() except Empty: pass except ValueError: @@ -262,6 +263,7 @@ class DevicePoolExecutor: queue.close() self.pending.clear() + self.join_leaking() logger.debug("stopping device workers") for device, worker in self.workers.items(): @@ -282,9 +284,7 @@ class DevicePoolExecutor: logger.debug("worker pool stopped") - def recycle(self): - logger.debug("recycling worker pool") - + def join_leaking(self): if len(self.leaking) > 0: logger.warning("cleaning up %s leaking workers", len(self.leaking)) for device, worker in self.leaking: @@ -297,6 +297,10 @@ class DevicePoolExecutor: self.leaking[:] = [dw for dw in self.leaking if dw[1].is_alive()] + def recycle(self): + logger.debug("recycling worker pool") + self.join_leaking() + needs_restart = [] for device, worker in self.workers.items():