From da6ae5d62f04da4cc00067c2772a6d8c91b17164 Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Mon, 27 Feb 2023 23:01:26 -0600 Subject: [PATCH] more logging around shutdown, close queues --- api/onnx_web/main.py | 7 ++++++- api/onnx_web/worker/pool.py | 18 ++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/api/onnx_web/main.py b/api/onnx_web/main.py index bbfa1039..54f2f7df 100644 --- a/api/onnx_web/main.py +++ b/api/onnx_web/main.py @@ -59,7 +59,12 @@ def main(): def run(): app, pool = main() - atexit.register(lambda: pool.join()) + + def quit(): + logger.info("shutting down workers") + pool.join() + + atexit.register(quit) return app diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 3bede110..d2db88ea 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -213,15 +213,29 @@ class DevicePoolExecutor: return (False, progress) def join(self): + logger.debug("stopping worker pool") + for device, worker in self.workers.items(): if worker.is_alive(): - logger.info("stopping worker for device %s", device) + logger.debug("stopping worker for device %s", device) worker.join(self.join_timeout) + else: + logger.debug("worker for device %s has died", device) for name, thread in self.threads.items(): - logger.info("stopping worker thread: %s", name) + logger.debug("stopping worker thread: %s", name) thread.join(self.join_timeout) + logger.debug("closing queues") + self.logs.close() + self.finished.close() + self.progress.close() + for key, queue in self.pending.items(): + queue.close() + del self.pending[key] + + logger.debug("worker pool fully joined") + def recycle(self): for name, proc in self.workers.items(): if proc.is_alive():