1
0
Fork 0

join all threads

This commit is contained in:
Sean Sube 2023-02-27 17:35:31 -06:00
parent 66a20e60fe
commit 2327b24022
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
3 changed files with 10 additions and 12 deletions

View File

@ -13,13 +13,13 @@ ProgressCallback = Callable[[int, int, Any], None]
class WorkerContext: class WorkerContext:
cancel: "Value[bool]" = None cancel: "Value[bool]" = None
key: str = None job: str = None
pending: "Queue[Tuple[Callable, Any, Any]]" = None pending: "Queue[Tuple[Callable, Any, Any]]" = None
progress: "Value[int]" = None progress: "Value[int]" = None
def __init__( def __init__(
self, self,
key: str, job: str,
device: DeviceParams, device: DeviceParams,
cancel: "Value[bool]" = None, cancel: "Value[bool]" = None,
logs: "Queue[str]" = None, logs: "Queue[str]" = None,
@ -27,7 +27,7 @@ class WorkerContext:
progress: "Queue[Tuple[str, int]]" = None, progress: "Queue[Tuple[str, int]]" = None,
finished: "Queue[str]" = None, finished: "Queue[str]" = None,
): ):
self.key = key self.job = job
self.device = device self.device = device
self.cancel = cancel self.cancel = cancel
self.progress = progress self.progress = progress
@ -53,7 +53,7 @@ class WorkerContext:
if self.is_cancelled(): if self.is_cancelled():
raise RuntimeError("job has been cancelled") raise RuntimeError("job has been cancelled")
else: else:
logger.debug("setting progress for job %s to %s", self.key, step) logger.debug("setting progress for job %s to %s", self.job, step)
self.set_progress(step) self.set_progress(step)
return on_progress return on_progress
@ -63,10 +63,10 @@ class WorkerContext:
self.cancel.value = cancel self.cancel.value = cancel
def set_progress(self, progress: int) -> None: def set_progress(self, progress: int) -> None:
self.progress.put((self.key, self.device.device, progress)) self.progress.put((self.job, self.device.device, progress))
def set_finished(self) -> None: def set_finished(self) -> None:
self.finished.put((self.key, self.device.device)) self.finished.put((self.job, self.device.device))
def clear_flags(self) -> None: def clear_flags(self) -> None:
self.set_cancel(False) self.set_cancel(False)

View File

@ -200,16 +200,14 @@ class DevicePoolExecutor:
return (False, progress) return (False, progress)
def join(self): def join(self):
self.progress_thread.join(self.join_timeout)
self.finished_thread.join(self.join_timeout)
for device, worker in self.workers.items(): for device, worker in self.workers.items():
if worker.is_alive(): if worker.is_alive():
logger.info("stopping worker for device %s", device) logger.info("stopping worker for device %s", device)
worker.join(self.join_timeout) worker.join(self.join_timeout)
if self.logger.is_alive(): for name, thread in self.threads.items():
self.logger.join(self.join_timeout) logger.info("stopping worker thread: %s", name)
thread.join(self.join_timeout)
def recycle(self): def recycle(self):
for name, proc in self.workers.items(): for name, proc in self.workers.items():

View File

@ -25,7 +25,7 @@ def worker_main(context: WorkerContext, server: ServerContext):
name = args[3][0] name = args[3][0]
try: try:
context.key = name # TODO: hax context.job = name # TODO: hax
context.clear_flags() context.clear_flags()
logger.info("starting job: %s", name) logger.info("starting job: %s", name)
fn(context, *args, **kwargs) fn(context, *args, **kwargs)