From feb4603171f8e2f538bf339fb8990ca7506ac9c9 Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Tue, 14 Feb 2023 17:23:23 -0600 Subject: [PATCH] fix(api): remove finished jobs from worker pool (#124) --- api/onnx_web/server/device_pool.py | 43 ++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/api/onnx_web/server/device_pool.py b/api/onnx_web/server/device_pool.py index 29db85b0..c0b1c6a7 100644 --- a/api/onnx_web/server/device_pool.py +++ b/api/onnx_web/server/device_pool.py @@ -107,15 +107,19 @@ class DevicePoolExecutor: jobs: List[Job] = None next_device: int = 0 pool: Union[ProcessPoolExecutor, ThreadPoolExecutor] = None + recent: List[Tuple[str, int]] = None def __init__( self, devices: List[DeviceParams], pool: Optional[Union[ProcessPoolExecutor, ThreadPoolExecutor]] = None, + recent_limit: int = 10, ): self.devices = devices self.jobs = [] self.next_device = 0 + self.recent = [] + self.recent_limit = recent_limit device_count = len(devices) if pool is None: @@ -150,10 +154,18 @@ class DevicePoolExecutor: return False def done(self, key: str) -> Tuple[Optional[bool], int]: + for k, progress in self.recent: + if key == k: + return (True, progress) + for job in self.jobs: if job.key == key: done = job.future.done() progress = job.get_progress() + + if done: + self.prune() + return (done, progress) logger.warn("checking status for unknown key: %s", key) @@ -186,7 +198,21 @@ class DevicePoolExecutor: return lowest_devices[0] def prune(self): - self.jobs[:] = [job for job in self.jobs if job.future.done()] + pending_jobs = [job for job in self.jobs if job.future.done()] + logger.debug("pruning %s of %s pending jobs", len(pending_jobs), len(self.jobs)) + + for job in pending_jobs: + self.recent.append((job.key, job.get_progress())) + try: + self.jobs.remove(job) + except ValueError as e: + logger.warning("error removing pruned job from pending: %s", e) + + # self.jobs[:] = [job for job in self.jobs if not job.future.done()] + recent_count = len(self.recent) + if recent_count > self.recent_limit: + logger.debug("pruning %s of %s recent jobs", recent_count - self.recent_limit, recent_count) + self.recent[:] = self.recent[-self.recent_limit :] def submit( self, @@ -197,6 +223,7 @@ class DevicePoolExecutor: needs_device: Optional[DeviceParams] = None, **kwargs, ) -> None: + self.prune() device = self.get_next_device(needs_device=needs_device) logger.info( "assigning job %s to device %s: %s", key, device, self.devices[device] @@ -222,7 +249,7 @@ class DevicePoolExecutor: future.add_done_callback(job_done) def status(self) -> List[Tuple[str, int, bool, int]]: - return [ + pending = [ ( job.key, job.context.device_index.value, @@ -231,3 +258,15 @@ class DevicePoolExecutor: ) for job in self.jobs ] + recent = [ + ( + key, + None, + True, + progress, + ) + for key, progress in self.recent + ] + + pending.extend(recent) + return pending