1
0
Fork 0

fix(api): increment job counter for worker when it starts a new job (#283)

This commit is contained in:
Sean Sube 2023-03-26 11:18:27 -05:00
parent 3aa7b8a238
commit 55e44e8ac9
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 12 additions and 12 deletions

View File

@ -355,24 +355,14 @@ class DevicePoolExecutor:
**kwargs, **kwargs,
) -> None: ) -> None:
device_idx = self.get_next_device(needs_device=needs_device) device_idx = self.get_next_device(needs_device=needs_device)
device = self.devices[device_idx].device
logger.info( logger.info(
"assigning job %s to device %s: %s", "assigning job %s to device %s: %s",
key, key,
device_idx, device_idx,
self.devices[device_idx], device,
) )
# increment job count before recycling (why tho?)
device = self.devices[device_idx].device
if device in self.total_jobs:
self.total_jobs[device] += 1
else:
self.total_jobs[device] = 1
# recycle before attempting to run
logger.debug("job count for device %s: %s", device, self.total_jobs[device])
self.rlock()
# build and queue job # build and queue job
job = JobCommand(key, device, fn, args, kwargs) job = JobCommand(key, device, fn, args, kwargs)
self.pending_jobs.append(job) self.pending_jobs.append(job)
@ -439,6 +429,16 @@ class DevicePoolExecutor:
job for job in self.pending_jobs if job.name != progress.job job for job in self.pending_jobs if job.name != progress.job
] ]
# increment job counter if this is the start of a new job
if progress.progress == 0:
if progress.device in self.total_jobs:
self.total_jobs[progress.device] += 1
else:
self.total_jobs[progress.device] = 1
logger.debug("updating job count for device %s: %s", progress.device, self.total_jobs[progress.device])
# check if the job has been cancelled
if progress.job in self.cancelled_jobs: if progress.job in self.cancelled_jobs:
logger.debug( logger.debug(
"setting flag for cancelled job: %s on %s", "setting flag for cancelled job: %s on %s",