1
0
Fork 0

fix(api): enqueue next job when previous one finishes and after recycling worker

This commit is contained in:
Sean Sube 2023-03-26 11:41:17 -05:00
parent f3ab25f671
commit 14ade83937
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 14 additions and 1 deletions

View File

@ -333,6 +333,7 @@ class DevicePoolExecutor:
for device in self.devices: for device in self.devices:
if device.device in needs_restart: if device.device in needs_restart:
self.create_device_worker(device) self.create_device_worker(device)
self.next_job(device)
if self.logger_worker.is_alive(): if self.logger_worker.is_alive():
logger.debug("logger worker is running") logger.debug("logger worker is running")
@ -369,7 +370,6 @@ class DevicePoolExecutor:
# build and queue job # build and queue job
job = JobCommand(key, device, fn, args, kwargs) job = JobCommand(key, device, fn, args, kwargs)
self.pending_jobs.append(job) self.pending_jobs.append(job)
self.pending[device].put(job, block=False)
def status(self) -> List[Tuple[str, int, bool, bool, bool, bool]]: def status(self) -> List[Tuple[str, int, bool, bool, bool, bool]]:
history = [ history = [
@ -411,6 +411,16 @@ class DevicePoolExecutor:
) )
return history return history
def next_job(self, device: str):
for job in self.pending_jobs:
if job.device == device:
logger.debug("enqueuing job %s on device %s", job.name, device)
self.pending[device].put(job, block=False)
self.pending_jobs.remove(job)
return
logger.trace("no pending jobs for device %s", device)
def update_job(self, progress: ProgressCommand): def update_job(self, progress: ProgressCommand):
if progress.finished: if progress.finished:
# move from running to finished # move from running to finished
@ -422,6 +432,9 @@ class DevicePoolExecutor:
self.join_leaking() self.join_leaking()
if progress.job in self.cancelled_jobs: if progress.job in self.cancelled_jobs:
self.cancelled_jobs.remove(progress.job) self.cancelled_jobs.remove(progress.job)
# enqueue the next job for this device
self.next_job(progress.device)
else: else:
# move from pending to running # move from pending to running
logger.debug( logger.debug(