1
0
Fork 0

only enqueue jobs from progress worker

This commit is contained in:
Sean Sube 2023-03-26 14:02:57 -05:00
parent ccf8d51e08
commit 0af406c47f
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 38 additions and 39 deletions

View File

@ -334,7 +334,6 @@ class DevicePoolExecutor:
for device in self.devices: for device in self.devices:
if device.device in needs_restart: if device.device in needs_restart:
self.create_device_worker(device) self.create_device_worker(device)
self.next_job(device)
if self.logger_worker.is_alive(): if self.logger_worker.is_alive():
logger.debug("logger worker is running") logger.debug("logger worker is running")
@ -420,51 +419,51 @@ class DevicePoolExecutor:
logger.trace("no pending jobs for device %s", device) logger.trace("no pending jobs for device %s", device)
def finish_job(self, progress: ProgressCommand):
# move from running to finished
logger.info("job has finished: %s", progress.job)
self.finished_jobs.append(progress)
if progress.job in self.running_jobs:
del self.running_jobs[progress.job]
self.join_leaking()
if progress.job in self.cancelled_jobs:
self.cancelled_jobs.remove(progress.job)
def update_job(self, progress: ProgressCommand): def update_job(self, progress: ProgressCommand):
if progress.finished: if progress.finished:
# move from running to finished return self.finish_job(progress)
logger.info("job has finished: %s", progress.job)
self.finished_jobs.append(progress)
if progress.job in self.running_jobs:
del self.running_jobs[progress.job]
self.join_leaking() # move from pending to running
if progress.job in self.cancelled_jobs: logger.debug(
self.cancelled_jobs.remove(progress.job) "progress update for job: %s to %s", progress.job, progress.progress
)
self.running_jobs[progress.job] = progress
self.pending_jobs[:] = [
job for job in self.pending_jobs if job.name != progress.job
]
# increment job counter if this is the start of a new job
if progress.progress == 0:
if progress.device in self.total_jobs:
self.total_jobs[progress.device] += 1
else:
self.total_jobs[progress.device] = 1
# enqueue the next job for this device
self.next_job(progress.device)
else:
# move from pending to running
logger.debug( logger.debug(
"progress update for job: %s to %s", progress.job, progress.progress "updating job count for device %s: %s",
progress.device,
self.total_jobs[progress.device],
) )
self.running_jobs[progress.job] = progress
self.pending_jobs[:] = [
job for job in self.pending_jobs if job.name != progress.job
]
# increment job counter if this is the start of a new job # check if the job has been cancelled
if progress.progress == 0: if progress.job in self.cancelled_jobs:
if progress.device in self.total_jobs: logger.debug(
self.total_jobs[progress.device] += 1 "setting flag for cancelled job: %s on %s",
else: progress.job,
self.total_jobs[progress.device] = 1 progress.device,
)
logger.debug( self.context[progress.device].set_cancel()
"updating job count for device %s: %s",
progress.device,
self.total_jobs[progress.device],
)
# check if the job has been cancelled
if progress.job in self.cancelled_jobs:
logger.debug(
"setting flag for cancelled job: %s on %s",
progress.job,
progress.device,
)
self.context[progress.device].set_cancel()
def health_main(pool: DevicePoolExecutor): def health_main(pool: DevicePoolExecutor):