1
0
Fork 0

update progress and finished flag from worker

This commit is contained in:
Sean Sube 2023-02-26 12:51:11 -06:00
parent f115326da7
commit e0737e9e08
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
3 changed files with 36 additions and 10 deletions

View File

@ -22,12 +22,16 @@ class WorkerContext:
device: DeviceParams, device: DeviceParams,
pending: "Queue[Any]", pending: "Queue[Any]",
progress: "Value[int]", progress: "Value[int]",
logs: "Queue[str]",
finished: "Value[bool]",
): ):
self.key = key self.key = key
self.cancel = cancel self.cancel = cancel
self.device = device self.device = device
self.pending = pending self.pending = pending
self.progress = progress self.progress = progress
self.logs = logs
self.finished = finished
def is_cancelled(self) -> bool: def is_cancelled(self) -> bool:
return self.cancel.value return self.cancel.value

View File

@ -14,10 +14,11 @@ logger = getLogger(__name__)
class DevicePoolExecutor: class DevicePoolExecutor:
devices: List[DeviceParams] = None devices: List[DeviceParams] = None
finished: List[Tuple[str, int]] = None finished: Dict[str, "Value[bool]"] = None
pending: Dict[str, "Queue[WorkerContext]"] = None pending: Dict[str, "Queue[WorkerContext]"] = None
progress: Dict[str, Value] = None progress: Dict[str, "Value[int]"] = None
workers: Dict[str, Process] = None workers: Dict[str, Process] = None
jobs: Dict[str, str] = None
def __init__( def __init__(
self, self,
@ -27,13 +28,14 @@ class DevicePoolExecutor:
): ):
self.server = server self.server = server
self.devices = devices self.devices = devices
self.finished = [] self.finished = {}
self.finished_limit = finished_limit self.finished_limit = finished_limit
self.context = {} self.context = {}
self.locks = {} self.locks = {}
self.pending = {} self.pending = {}
self.progress = {} self.progress = {}
self.workers = {} self.workers = {}
self.jobs = {} # Dict[Output, Device]
# TODO: make this a method # TODO: make this a method
logger.debug("starting log worker") logger.debug("starting log worker")
@ -53,11 +55,13 @@ class DevicePoolExecutor:
lock = Lock() lock = Lock()
self.locks[name] = lock self.locks[name] = lock
cancel = Value("B", False, lock=lock) cancel = Value("B", False, lock=lock)
finished = Value("B", False)
self.finished[name] = finished
progress = Value("I", 0) # , lock=lock) # needs its own lock for some reason. TODO: why? progress = Value("I", 0) # , lock=lock) # needs its own lock for some reason. TODO: why?
self.progress[name] = progress self.progress[name] = progress
pending = Queue() pending = Queue()
self.pending[name] = pending self.pending[name] = pending
context = WorkerContext(name, cancel, device, pending, progress) context = WorkerContext(name, cancel, device, pending, progress, self.log_queue, finished)
self.context[name] = context self.context[name] = context
logger.debug("starting worker for device %s", device) logger.debug("starting worker for device %s", device)
@ -73,12 +77,16 @@ class DevicePoolExecutor:
raise NotImplementedError() raise NotImplementedError()
def done(self, key: str) -> Tuple[Optional[bool], int]: def done(self, key: str) -> Tuple[Optional[bool], int]:
for k, progress in self.finished: if not key in self.jobs:
if key == k: logger.warn("checking status for unknown key: %s", key)
return (True, progress) return (None, 0)
device = self.jobs[key]
finished = self.finished[device]
progress = self.progress[device]
return (finished.value, progress.value)
logger.warn("checking status for unknown key: %s", key)
return (None, 0)
def get_next_device(self, needs_device: Optional[DeviceParams] = None) -> int: def get_next_device(self, needs_device: Optional[DeviceParams] = None) -> int:
# respect overrides if possible # respect overrides if possible
@ -140,6 +148,8 @@ class DevicePoolExecutor:
queue = self.pending[device.device] queue = self.pending[device.device]
queue.put((fn, args, kwargs)) queue.put((fn, args, kwargs))
self.jobs[key] = device.device
def status(self) -> List[Tuple[str, int, bool, int]]: def status(self) -> List[Tuple[str, int, bool, int]]:
pending = [ pending = [

View File

@ -32,8 +32,20 @@ def worker_init(lock: Lock, context: WorkerContext, server: ServerContext):
logger.info("got job: %s", job) logger.info("got job: %s", job)
try: try:
fn, args, kwargs = job fn, args, kwargs = job
name = args[3][0]
logger.info("starting job: %s", name)
with context.finished.get_lock():
context.finished.value = False
with context.progress.get_lock():
context.progress.value = 0
fn(context, *args, **kwargs) fn(context, *args, **kwargs)
logger.info("finished job") logger.info("finished job: %s", name)
with context.finished.get_lock():
context.finished.value = True
except Exception as e: except Exception as e:
logger.error(format_exception(type(e), e, e.__traceback__)) logger.error(format_exception(type(e), e, e.__traceback__))