From 6502e1e3c8365a25f2712c9e6c3e16ba59ccf52d Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Sun, 26 Feb 2023 12:58:38 -0600 Subject: [PATCH] recycle worker pool after 10 jobs --- api/onnx_web/worker/pool.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 0721896e..cd78051d 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -36,6 +36,7 @@ class DevicePoolExecutor: self.progress = {} self.workers = {} self.jobs = {} # Dict[Output, Device] + self.job_count = 0 # TODO: make this a method logger.debug("starting log worker") @@ -129,6 +130,27 @@ class DevicePoolExecutor: ) self.finished[:] = self.finished[-self.finished_limit:] + def recycle(self): + for name, proc in self.workers.items(): + if proc.is_alive(): + logger.debug("shutting down worker for device %s", name) + proc.join(5) + else: + logger.warning("worker for device %s has died", name) + + self.workers[name] = None + + logger.info("starting new workers") + + for name in self.workers.keys(): + context = self.context[name] + lock = self.locks[name] + + logger.debug("starting worker for device %s", name) + self.workers[name] = Process(target=worker_init, args=(lock, context, self.server)) + self.workers[name].start() + + def submit( self, key: str, @@ -138,6 +160,11 @@ class DevicePoolExecutor: needs_device: Optional[DeviceParams] = None, **kwargs, ) -> None: + self.job_count += 1 + if self.job_count > 10: + self.recycle() + self.job_count = 0 + self.prune() device_idx = self.get_next_device(needs_device=needs_device) logger.info(