From 83884bcafaebd661f8e8b03c63a4fea81f643052 Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Sun, 26 Mar 2023 11:48:27 -0500 Subject: [PATCH] enqueue jobs on idle workers during progress check --- api/onnx_web/worker/pool.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index eae785a1..646f3a4f 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -151,7 +151,7 @@ class DevicePoolExecutor: def create_progress_worker(self) -> None: self.progress_worker = Interval( - self.progress_interval, progress_main, args=(self, self.progress) + self.progress_interval, progress_main, args=(self,) ) self.progress_worker.daemon = True self.progress_worker.name = "onnx-web progress" @@ -499,10 +499,10 @@ def logger_main(pool: DevicePoolExecutor, logs: "Queue[str]"): def progress_main( - pool: DevicePoolExecutor, queues: Dict[str, "Queue[ProgressCommand]"] + pool: DevicePoolExecutor ): logger.trace("checking in from progress worker thread") - for device, queue in queues.items(): + for device, queue in pool.progress.items(): try: progress = queue.get_nowait() while progress is not None: @@ -516,3 +516,8 @@ def progress_main( break except Exception: logger.exception("error in progress worker for device %s", device) + + for device, queue in pool.pending.items(): + if queue.empty(): + logger.debug("enqueueing next job for idle worker") + pool.next_job(device)