1
0
Fork 0

set queue timeouts

This commit is contained in:
Sean Sube 2023-02-27 22:37:43 -06:00
parent 0793b61c3a
commit 136759285d
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
3 changed files with 9 additions and 6 deletions

View File

@ -63,7 +63,7 @@ class WorkerContext:
self.cancel.value = cancel self.cancel.value = cancel
def set_progress(self, progress: int) -> None: def set_progress(self, progress: int) -> None:
self.progress.put((self.job, self.device.device, progress)) self.progress.put((self.job, self.device.device, progress), block=False)
def set_finished(self) -> None: def set_finished(self) -> None:
self.finished.put((self.job, self.device.device)) self.finished.put((self.job, self.device.device))

View File

@ -88,7 +88,7 @@ class DevicePoolExecutor:
while True: while True:
try: try:
job = logs.get() job = logs.get(timeout=(self.join_timeout / 2))
with open("worker.log", "w") as f: with open("worker.log", "w") as f:
logger.info("got log: %s", job) logger.info("got log: %s", job)
f.write(str(job) + "\n\n") f.write(str(job) + "\n\n")
@ -106,7 +106,7 @@ class DevicePoolExecutor:
logger.info("checking in from progress worker thread") logger.info("checking in from progress worker thread")
while True: while True:
try: try:
job, device, value = progress.get() job, device, value = progress.get(timeout=(self.join_timeout / 2))
logger.info("progress update for job: %s to %s", job, value) logger.info("progress update for job: %s to %s", job, value)
self.active_jobs[job] = (device, value) self.active_jobs[job] = (device, value)
if job in self.cancelled_jobs: if job in self.cancelled_jobs:
@ -128,7 +128,7 @@ class DevicePoolExecutor:
logger.info("checking in from finished worker thread") logger.info("checking in from finished worker thread")
while True: while True:
try: try:
job, device = finished.get() job, device = finished.get(timeout=(self.join_timeout / 2))
logger.info("job has been finished: %s", job) logger.info("job has been finished: %s", job)
context = self.context[device] context = self.context[device]
_device, progress = self.active_jobs[job] _device, progress = self.active_jobs[job]
@ -256,7 +256,7 @@ class DevicePoolExecutor:
) )
device = self.devices[device_idx].device device = self.devices[device_idx].device
self.pending[device].put((key, fn, args, kwargs)) self.pending[device].put((key, fn, args, kwargs), block=False)
def status(self) -> List[Tuple[str, int, bool, bool]]: def status(self) -> List[Tuple[str, int, bool, bool]]:
history = [ history = [

View File

@ -1,4 +1,5 @@
from logging import getLogger from logging import getLogger
from queue import Empty
from traceback import format_exception from traceback import format_exception
from setproctitle import setproctitle from setproctitle import setproctitle
@ -18,7 +19,7 @@ def worker_main(context: WorkerContext, server: ServerContext):
logger.info("checking in from worker, %s", get_available_providers()) logger.info("checking in from worker, %s", get_available_providers())
while True: while True:
name, fn, args, kwargs = context.pending.get() name, fn, args, kwargs = context.pending.get(timeout=1.0)
logger.info("worker for %s got job: %s", context.device.device, name) logger.info("worker for %s got job: %s", context.device.device, name)
try: try:
@ -27,6 +28,8 @@ def worker_main(context: WorkerContext, server: ServerContext):
logger.info("starting job: %s", name) logger.info("starting job: %s", name)
fn(context, *args, **kwargs) fn(context, *args, **kwargs)
logger.info("job succeeded: %s", name) logger.info("job succeeded: %s", name)
except Empty:
pass
except Exception as e: except Exception as e:
logger.error( logger.error(
"error while running job: %s", "error while running job: %s",