1
0
Fork 0

remove remaining references to finished queue and worker

This commit is contained in:
Sean Sube 2023-03-18 15:26:19 -05:00
parent d1565b056e
commit b026566ccb
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 0 additions and 29 deletions

View File

@ -64,12 +64,10 @@ class DevicePoolExecutor:
self.logs = Queue(self.max_pending_per_worker) self.logs = Queue(self.max_pending_per_worker)
self.progress = Queue(self.max_pending_per_worker) self.progress = Queue(self.max_pending_per_worker)
self.finished = Queue(self.max_pending_per_worker)
# TODO: these should be part of a start method # TODO: these should be part of a start method
self.create_logger_worker() self.create_logger_worker()
self.create_progress_worker() self.create_progress_worker()
self.create_finished_worker()
for device in devices: for device in devices:
self.create_device_worker(device) self.create_device_worker(device)
@ -99,7 +97,6 @@ class DevicePoolExecutor:
device, device,
cancel=Value("B", False), cancel=Value("B", False),
progress=self.progress, progress=self.progress,
finished=self.finished,
logs=self.logs, logs=self.logs,
pending=pending, pending=pending,
current=current, current=current,
@ -185,31 +182,6 @@ class DevicePoolExecutor:
logger.debug("starting progress worker") logger.debug("starting progress worker")
progress_thread.start() progress_thread.start()
def create_finished_worker(self) -> None:
def finished_worker(finished: Queue):
logger.trace("checking in from finished worker thread")
while True:
try:
job, device = finished.get(timeout=(self.join_timeout / 2))
except Empty:
pass
except ValueError:
break
except Exception:
logger.exception("error in finished worker")
finished_thread = Thread(
name="onnx-web finished",
target=finished_worker,
args=(self.finished,),
daemon=True,
)
self.threads["finished"] = finished_thread
logger.debug("started finished worker")
finished_thread.start()
def get_job_context(self, key: str) -> WorkerContext: def get_job_context(self, key: str) -> WorkerContext:
device, _progress = self.active_jobs[key] device, _progress = self.active_jobs[key]
return self.context[device] return self.context[device]
@ -276,7 +248,6 @@ class DevicePoolExecutor:
logger.debug("closing queues") logger.debug("closing queues")
self.logs.close() self.logs.close()
self.finished.close()
self.progress.close() self.progress.close()
for queue in self.pending.values(): for queue in self.pending.values():
queue.close() queue.close()