From 0af406c47fa28cc95d6ec784ad3a21765e3fd67b Mon Sep 17 00:00:00 2001 From: Sean Sube Date: Sun, 26 Mar 2023 14:02:57 -0500 Subject: [PATCH] only enqueue jobs from progress worker --- api/onnx_web/worker/pool.py | 77 ++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/api/onnx_web/worker/pool.py b/api/onnx_web/worker/pool.py index 9107c8d3..953aba9a 100644 --- a/api/onnx_web/worker/pool.py +++ b/api/onnx_web/worker/pool.py @@ -334,7 +334,6 @@ class DevicePoolExecutor: for device in self.devices: if device.device in needs_restart: self.create_device_worker(device) - self.next_job(device) if self.logger_worker.is_alive(): logger.debug("logger worker is running") @@ -420,51 +419,51 @@ class DevicePoolExecutor: logger.trace("no pending jobs for device %s", device) + def finish_job(self, progress: ProgressCommand): + # move from running to finished + logger.info("job has finished: %s", progress.job) + self.finished_jobs.append(progress) + if progress.job in self.running_jobs: + del self.running_jobs[progress.job] + + self.join_leaking() + if progress.job in self.cancelled_jobs: + self.cancelled_jobs.remove(progress.job) + def update_job(self, progress: ProgressCommand): if progress.finished: - # move from running to finished - logger.info("job has finished: %s", progress.job) - self.finished_jobs.append(progress) - if progress.job in self.running_jobs: - del self.running_jobs[progress.job] + return self.finish_job(progress) - self.join_leaking() - if progress.job in self.cancelled_jobs: - self.cancelled_jobs.remove(progress.job) + # move from pending to running + logger.debug( + "progress update for job: %s to %s", progress.job, progress.progress + ) + self.running_jobs[progress.job] = progress + self.pending_jobs[:] = [ + job for job in self.pending_jobs if job.name != progress.job + ] + + # increment job counter if this is the start of a new job + if progress.progress == 0: + if progress.device in self.total_jobs: + self.total_jobs[progress.device] += 1 + else: + self.total_jobs[progress.device] = 1 - # enqueue the next job for this device - self.next_job(progress.device) - else: - # move from pending to running logger.debug( - "progress update for job: %s to %s", progress.job, progress.progress + "updating job count for device %s: %s", + progress.device, + self.total_jobs[progress.device], ) - self.running_jobs[progress.job] = progress - self.pending_jobs[:] = [ - job for job in self.pending_jobs if job.name != progress.job - ] - # increment job counter if this is the start of a new job - if progress.progress == 0: - if progress.device in self.total_jobs: - self.total_jobs[progress.device] += 1 - else: - self.total_jobs[progress.device] = 1 - - logger.debug( - "updating job count for device %s: %s", - progress.device, - self.total_jobs[progress.device], - ) - - # check if the job has been cancelled - if progress.job in self.cancelled_jobs: - logger.debug( - "setting flag for cancelled job: %s on %s", - progress.job, - progress.device, - ) - self.context[progress.device].set_cancel() + # check if the job has been cancelled + if progress.job in self.cancelled_jobs: + logger.debug( + "setting flag for cancelled job: %s on %s", + progress.job, + progress.device, + ) + self.context[progress.device].set_cancel() def health_main(pool: DevicePoolExecutor):