From 4ddd69ba07ce73c8829787f358fddf6ea6065000 Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Sun, 26 Mar 2023 18:22:16 -0500 Subject: [PATCH] fix(api): watch for progress events from leaking workers --- api/onnx_web/worker/pool.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 8a546f2a..c657feb3 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -277,7 +277,7 @@ class DevicePoolExecutor: def join_leaking(self): if len(self.leaking) > 0: - for device, worker, _context in self.leaking: + for device, worker, context in self.leaking: logger.warning( "shutting down leaking worker %s for device %s", worker.pid, device ) @@ -289,6 +289,21 @@ class DevicePoolExecutor: device, ) + try: + progress = context.progress.get_nowait() + while progress is not None: + self.update_job(progress) + progress = context.progress.get_nowait() + except Empty: + logger.trace("empty queue in leaking worker for device %s", device) + except ValueError as e: + logger.debug("value error in leaking worker for device %s: %s", device, e) + break + except Exception: + logger.exception("error in leaking worker for device %s", device) + + + self.leaking[:] = [dw for dw in self.leaking if dw[1].is_alive()] def recycle(self):