1
0
Fork 0

enqueue jobs on idle workers during progress check

This commit is contained in:
Sean Sube 2023-03-26 11:48:27 -05:00
parent 14ade83937
commit 83884bcafa
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 8 additions and 3 deletions

View File

@ -151,7 +151,7 @@ class DevicePoolExecutor:
def create_progress_worker(self) -> None: def create_progress_worker(self) -> None:
self.progress_worker = Interval( self.progress_worker = Interval(
self.progress_interval, progress_main, args=(self, self.progress) self.progress_interval, progress_main, args=(self,)
) )
self.progress_worker.daemon = True self.progress_worker.daemon = True
self.progress_worker.name = "onnx-web progress" self.progress_worker.name = "onnx-web progress"
@ -499,10 +499,10 @@ def logger_main(pool: DevicePoolExecutor, logs: "Queue[str]"):
def progress_main( def progress_main(
pool: DevicePoolExecutor, queues: Dict[str, "Queue[ProgressCommand]"] pool: DevicePoolExecutor
): ):
logger.trace("checking in from progress worker thread") logger.trace("checking in from progress worker thread")
for device, queue in queues.items(): for device, queue in pool.progress.items():
try: try:
progress = queue.get_nowait() progress = queue.get_nowait()
while progress is not None: while progress is not None:
@ -516,3 +516,8 @@ def progress_main(
break break
except Exception: except Exception:
logger.exception("error in progress worker for device %s", device) logger.exception("error in progress worker for device %s", device)
for device, queue in pool.pending.items():
if queue.empty():
logger.debug("enqueueing next job for idle worker")
pool.next_job(device)