From 136759285ddf9e06975727e4dc32db659f858e9d Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Mon, 27 Feb 2023 22:37:43 -0600 Subject: [PATCH] set queue timeouts --- api/onnx_web/worker/context.py | 2 +- api/onnx_web/worker/pool.py | 8 ++++---- api/onnx_web/worker/worker.py | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/api/onnx_web/worker/context.py b/api/onnx_web/worker/context.py index a69e28a2..0f1af7e2 100644 --- a/api/onnx_web/worker/context.py +++ b/api/onnx_web/worker/context.py @@ -63,7 +63,7 @@ class WorkerContext: self.cancel.value = cancel def set_progress(self, progress: int) -> None: - self.progress.put((self.job, self.device.device, progress)) + self.progress.put((self.job, self.device.device, progress), block=False) def set_finished(self) -> None: self.finished.put((self.job, self.device.device)) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 9b8925f3..4105ac57 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -88,7 +88,7 @@ class DevicePoolExecutor: while True: try: - job = logs.get() + job = logs.get(timeout=(self.join_timeout / 2)) with open("worker.log", "w") as f: logger.info("got log: %s", job) f.write(str(job) + "\n\n") @@ -106,7 +106,7 @@ class DevicePoolExecutor: logger.info("checking in from progress worker thread") while True: try: - job, device, value = progress.get() + job, device, value = progress.get(timeout=(self.join_timeout / 2)) logger.info("progress update for job: %s to %s", job, value) self.active_jobs[job] = (device, value) if job in self.cancelled_jobs: @@ -128,7 +128,7 @@ class DevicePoolExecutor: logger.info("checking in from finished worker thread") while True: try: - job, device = finished.get() + job, device = finished.get(timeout=(self.join_timeout / 2)) logger.info("job has been finished: %s", job) context = self.context[device] _device, progress = self.active_jobs[job] @@ -256,7 +256,7 @@ class DevicePoolExecutor: ) device = self.devices[device_idx].device - self.pending[device].put((key, fn, args, kwargs)) + self.pending[device].put((key, fn, args, kwargs), block=False) def status(self) -> List[Tuple[str, int, bool, bool]]: history = [ diff --git a/api/onnx_web/worker/worker.py b/api/onnx_web/worker/worker.py index 6755cd1f..ef925a33 100644 --- a/api/onnx_web/worker/worker.py +++ b/api/onnx_web/worker/worker.py @@ -1,4 +1,5 @@ from logging import getLogger +from queue import Empty from traceback import format_exception from setproctitle import setproctitle @@ -18,7 +19,7 @@ def worker_main(context: WorkerContext, server: ServerContext): logger.info("checking in from worker, %s", get_available_providers()) while True: - name, fn, args, kwargs = context.pending.get() + name, fn, args, kwargs = context.pending.get(timeout=1.0) logger.info("worker for %s got job: %s", context.device.device, name) try: @@ -27,6 +28,8 @@ def worker_main(context: WorkerContext, server: ServerContext): logger.info("starting job: %s", name) fn(context, *args, **kwargs) logger.info("job succeeded: %s", name) + except Empty: + pass except Exception as e: logger.error( "error while running job: %s",