2023-02-26 05:49:39 +00:00
|
|
|
from collections import Counter
|
|
|
|
from logging import getLogger
|
2023-02-28 04:45:29 +00:00
|
|
|
from queue import Empty
|
2023-03-25 14:47:51 +00:00
|
|
|
from threading import Thread, Lock
|
2023-03-18 20:16:41 +00:00
|
|
|
from typing import Callable, Dict, List, Optional, Tuple
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-27 02:09:42 +00:00
|
|
|
from torch.multiprocessing import Process, Queue, Value
|
2023-02-26 20:15:30 +00:00
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
from ..params import DeviceParams
|
2023-02-26 18:32:48 +00:00
|
|
|
from ..server import ServerContext
|
2023-03-18 20:12:09 +00:00
|
|
|
from .command import JobCommand, ProgressCommand
|
2023-02-26 05:49:39 +00:00
|
|
|
from .context import WorkerContext
|
2023-02-27 23:14:53 +00:00
|
|
|
from .worker import worker_main
|
2023-02-26 05:49:39 +00:00
|
|
|
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class DevicePoolExecutor:
|
2023-03-01 03:44:52 +00:00
|
|
|
server: ServerContext
|
|
|
|
devices: List[DeviceParams]
|
|
|
|
max_jobs_per_worker: int
|
|
|
|
max_pending_per_worker: int
|
|
|
|
join_timeout: float
|
|
|
|
|
2023-03-06 00:58:13 +00:00
|
|
|
leaking: List[Tuple[str, Process]]
|
2023-03-18 23:35:11 +00:00
|
|
|
context: Dict[str, WorkerContext] # Device -> Context
|
|
|
|
current: Dict[str, "Value[int]"] # Device -> pid
|
2023-03-18 20:12:09 +00:00
|
|
|
pending: Dict[str, "Queue[JobCommand]"]
|
2023-03-01 03:44:52 +00:00
|
|
|
threads: Dict[str, Thread]
|
|
|
|
workers: Dict[str, Process]
|
|
|
|
|
|
|
|
cancelled_jobs: List[str]
|
2023-03-18 20:12:09 +00:00
|
|
|
finished_jobs: List[ProgressCommand]
|
2023-03-18 22:15:18 +00:00
|
|
|
pending_jobs: List[JobCommand]
|
|
|
|
running_jobs: Dict[str, ProgressCommand] # Device -> job progress
|
2023-03-02 01:09:18 +00:00
|
|
|
total_jobs: Dict[str, int] # Device -> job count
|
2023-03-01 03:44:52 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
logs: "Queue[str]"
|
|
|
|
progress: "Queue[ProgressCommand]"
|
2023-03-25 14:47:51 +00:00
|
|
|
rlock: Lock
|
2023-02-26 05:49:39 +00:00
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
2023-02-26 18:32:48 +00:00
|
|
|
server: ServerContext,
|
2023-02-26 05:49:39 +00:00
|
|
|
devices: List[DeviceParams],
|
2023-03-01 03:44:52 +00:00
|
|
|
max_pending_per_worker: int = 100,
|
2023-02-27 23:14:53 +00:00
|
|
|
join_timeout: float = 1.0,
|
2023-02-26 05:49:39 +00:00
|
|
|
):
|
2023-02-26 18:32:48 +00:00
|
|
|
self.server = server
|
2023-02-26 05:49:39 +00:00
|
|
|
self.devices = devices
|
2023-03-18 18:40:15 +00:00
|
|
|
self.max_jobs_per_worker = server.job_limit
|
2023-03-01 03:44:52 +00:00
|
|
|
self.max_pending_per_worker = max_pending_per_worker
|
2023-02-26 21:06:40 +00:00
|
|
|
self.join_timeout = join_timeout
|
|
|
|
|
2023-03-06 00:58:13 +00:00
|
|
|
self.leaking = []
|
2023-02-26 18:24:51 +00:00
|
|
|
self.context = {}
|
2023-03-06 03:28:21 +00:00
|
|
|
self.current = {}
|
2023-02-26 05:49:39 +00:00
|
|
|
self.pending = {}
|
2023-02-27 23:14:53 +00:00
|
|
|
self.threads = {}
|
2023-02-26 05:49:39 +00:00
|
|
|
self.workers = {}
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
self.cancelled_jobs = []
|
2023-02-27 02:09:42 +00:00
|
|
|
self.finished_jobs = []
|
2023-03-18 22:26:28 +00:00
|
|
|
self.pending_jobs = []
|
|
|
|
self.running_jobs = {}
|
2023-03-02 01:09:18 +00:00
|
|
|
self.total_jobs = {}
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-01 03:44:52 +00:00
|
|
|
self.logs = Queue(self.max_pending_per_worker)
|
|
|
|
self.progress = Queue(self.max_pending_per_worker)
|
2023-03-25 14:47:51 +00:00
|
|
|
self.rlock = Lock()
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-03-02 01:09:18 +00:00
|
|
|
# TODO: these should be part of a start method
|
2023-02-26 21:06:40 +00:00
|
|
|
self.create_logger_worker()
|
2023-02-27 23:14:53 +00:00
|
|
|
self.create_progress_worker()
|
|
|
|
|
2023-02-26 21:06:40 +00:00
|
|
|
for device in devices:
|
|
|
|
self.create_device_worker(device)
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-26 21:06:40 +00:00
|
|
|
def create_device_worker(self, device: DeviceParams) -> None:
|
|
|
|
name = device.device
|
2023-02-27 02:09:42 +00:00
|
|
|
|
|
|
|
# reuse the queue if possible, to keep queued jobs
|
|
|
|
if name in self.pending:
|
2023-02-28 05:43:38 +00:00
|
|
|
logger.debug("using existing pending job queue")
|
2023-02-27 02:09:42 +00:00
|
|
|
pending = self.pending[name]
|
|
|
|
else:
|
2023-02-28 05:43:38 +00:00
|
|
|
logger.debug("creating new pending job queue")
|
2023-03-01 03:44:52 +00:00
|
|
|
pending = Queue(self.max_pending_per_worker)
|
2023-02-27 02:09:42 +00:00
|
|
|
self.pending[name] = pending
|
|
|
|
|
2023-03-06 03:28:21 +00:00
|
|
|
if name in self.current:
|
|
|
|
logger.debug("using existing current worker value")
|
|
|
|
current = self.current[name]
|
|
|
|
else:
|
|
|
|
logger.debug("creating new current worker value")
|
|
|
|
current = Value("L", 0)
|
|
|
|
self.current[name] = current
|
|
|
|
|
2023-02-26 21:06:40 +00:00
|
|
|
context = WorkerContext(
|
|
|
|
name,
|
|
|
|
device,
|
|
|
|
cancel=Value("B", False),
|
2023-02-27 02:37:22 +00:00
|
|
|
progress=self.progress,
|
2023-02-27 23:14:53 +00:00
|
|
|
logs=self.logs,
|
2023-02-27 02:09:42 +00:00
|
|
|
pending=pending,
|
2023-03-18 20:32:49 +00:00
|
|
|
active_pid=current,
|
2023-02-26 21:06:40 +00:00
|
|
|
)
|
|
|
|
self.context[name] = context
|
2023-03-06 03:28:21 +00:00
|
|
|
worker = Process(
|
2023-03-01 03:44:52 +00:00
|
|
|
name=f"onnx-web worker: {name}",
|
2023-03-23 03:58:46 +00:00
|
|
|
target=worker_main,
|
2023-03-01 03:44:52 +00:00
|
|
|
args=(context, self.server),
|
|
|
|
)
|
2023-02-26 21:06:40 +00:00
|
|
|
|
|
|
|
logger.debug("starting worker for device %s", device)
|
2023-03-06 03:28:21 +00:00
|
|
|
worker.start()
|
|
|
|
self.workers[name] = worker
|
|
|
|
current.value = worker.pid
|
2023-02-26 21:06:40 +00:00
|
|
|
|
2023-02-27 23:14:53 +00:00
|
|
|
def create_logger_worker(self) -> None:
|
2023-03-01 03:44:52 +00:00
|
|
|
logger_thread = Thread(
|
2023-03-23 03:58:46 +00:00
|
|
|
name="onnx-web logger",
|
|
|
|
target=logger_main,
|
|
|
|
args=(
|
|
|
|
self,
|
|
|
|
self.logs,
|
|
|
|
),
|
|
|
|
daemon=True,
|
2023-03-01 03:44:52 +00:00
|
|
|
)
|
2023-02-27 23:14:53 +00:00
|
|
|
self.threads["logger"] = logger_thread
|
|
|
|
|
|
|
|
logger.debug("starting logger worker")
|
|
|
|
logger_thread.start()
|
|
|
|
|
|
|
|
def create_progress_worker(self) -> None:
|
2023-03-01 03:44:52 +00:00
|
|
|
progress_thread = Thread(
|
|
|
|
name="onnx-web progress",
|
2023-03-23 03:58:46 +00:00
|
|
|
target=progress_main,
|
|
|
|
args=(
|
|
|
|
self,
|
|
|
|
self.progress,
|
|
|
|
),
|
2023-03-01 03:44:52 +00:00
|
|
|
daemon=True,
|
|
|
|
)
|
2023-02-27 23:14:53 +00:00
|
|
|
self.threads["progress"] = progress_thread
|
|
|
|
|
|
|
|
logger.debug("starting progress worker")
|
|
|
|
progress_thread.start()
|
|
|
|
|
2023-02-27 02:09:42 +00:00
|
|
|
def get_job_context(self, key: str) -> WorkerContext:
|
2023-03-18 22:15:18 +00:00
|
|
|
device, _progress = self.running_jobs[key]
|
2023-02-27 02:09:42 +00:00
|
|
|
return self.context[device]
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-27 23:14:53 +00:00
|
|
|
def get_next_device(self, needs_device: Optional[DeviceParams] = None) -> int:
|
|
|
|
# respect overrides if possible
|
|
|
|
if needs_device is not None:
|
|
|
|
for i in range(len(self.devices)):
|
|
|
|
if self.devices[i].device == needs_device.device:
|
|
|
|
return i
|
|
|
|
|
|
|
|
jobs = Counter(range(len(self.devices)))
|
|
|
|
jobs.update([self.pending[d.device].qsize() for d in self.devices])
|
|
|
|
|
|
|
|
queued = jobs.most_common()
|
2023-03-17 01:22:20 +00:00
|
|
|
logger.trace("jobs queued by device: %s", queued)
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
lowest_count = queued[-1][1]
|
|
|
|
lowest_devices = [d[0] for d in queued if d[1] == lowest_count]
|
|
|
|
lowest_devices.sort()
|
|
|
|
|
|
|
|
return lowest_devices[0]
|
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
def cancel(self, key: str) -> bool:
|
|
|
|
"""
|
|
|
|
Cancel a job. If the job has not been started, this will cancel
|
|
|
|
the future and never execute it. If the job has been started, it
|
|
|
|
should be cancelled on the next progress callback.
|
|
|
|
"""
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
for job in self.finished_jobs:
|
|
|
|
if job.job == key:
|
|
|
|
logger.debug("cannot cancel finished job: %s", key)
|
|
|
|
return False
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
for job in self.pending_jobs:
|
|
|
|
if job.name == key:
|
2023-03-18 23:35:11 +00:00
|
|
|
self.pending_jobs[:] = [
|
|
|
|
job for job in self.pending_jobs if job.name != key
|
|
|
|
]
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.info("cancelled pending job: %s", key)
|
|
|
|
return True
|
|
|
|
|
|
|
|
if key not in self.running_jobs:
|
2023-03-18 20:12:09 +00:00
|
|
|
logger.debug("cancelled job is not active: %s", key)
|
|
|
|
else:
|
2023-03-18 22:15:18 +00:00
|
|
|
job = self.running_jobs[key]
|
2023-03-18 20:12:09 +00:00
|
|
|
logger.info("cancelling job %s, active on device %s", key, job.device)
|
2023-02-26 20:36:32 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
self.cancelled_jobs.append(key)
|
2023-02-26 20:36:32 +00:00
|
|
|
return True
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
def done(self, key: str) -> Tuple[bool, Optional[ProgressCommand]]:
|
2023-02-27 23:14:53 +00:00
|
|
|
"""
|
|
|
|
Check if a job has been finished and report the last progress update.
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
If the job is still pending, the first item will be True and there will be no ProgressCommand.
|
2023-02-27 23:14:53 +00:00
|
|
|
"""
|
2023-03-18 22:15:18 +00:00
|
|
|
if key in self.running_jobs:
|
|
|
|
logger.debug("checking status for running job: %s", key)
|
|
|
|
return (False, self.running_jobs[key])
|
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
for job in self.finished_jobs:
|
|
|
|
if job.job == key:
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.debug("checking status for finished job: %s", key)
|
|
|
|
return (False, job)
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
for job in self.pending_jobs:
|
|
|
|
if job.name == key:
|
|
|
|
logger.debug("checking status for pending job: %s", key)
|
2023-03-18 22:25:13 +00:00
|
|
|
return (True, None)
|
2023-02-26 18:51:11 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.trace("checking status for unknown job: %s", key)
|
|
|
|
return (False, None)
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-26 16:47:31 +00:00
|
|
|
def join(self):
|
2023-03-01 03:44:52 +00:00
|
|
|
logger.info("stopping worker pool")
|
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
with self.rlock:
|
|
|
|
logger.debug("closing queues")
|
|
|
|
self.logs.close()
|
|
|
|
self.progress.close()
|
|
|
|
for queue in self.pending.values():
|
|
|
|
queue.close()
|
2023-02-28 05:01:26 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
self.pending.clear()
|
|
|
|
self.join_leaking()
|
2023-03-01 03:44:52 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("stopping device workers")
|
|
|
|
for device, worker in self.workers.items():
|
2023-03-02 01:09:18 +00:00
|
|
|
if worker.is_alive():
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("stopping worker %s for device %s", worker.pid, device)
|
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
|
|
|
logger.warning(
|
|
|
|
"worker %s for device %s could not be stopped in time",
|
|
|
|
worker.pid,
|
|
|
|
device,
|
|
|
|
)
|
|
|
|
self.leaking.append((device, worker))
|
|
|
|
else:
|
|
|
|
logger.debug("worker for device %s has died", device)
|
2023-02-26 16:47:31 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
for name, thread in self.threads.items():
|
|
|
|
logger.debug("stopping worker %s for thread %s", thread.ident, name)
|
|
|
|
thread.join(self.join_timeout)
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("worker pool stopped")
|
2023-02-28 05:01:26 +00:00
|
|
|
|
2023-03-06 02:13:28 +00:00
|
|
|
def join_leaking(self):
|
2023-03-06 00:58:13 +00:00
|
|
|
if len(self.leaking) > 0:
|
|
|
|
logger.warning("cleaning up %s leaking workers", len(self.leaking))
|
|
|
|
for device, worker in self.leaking:
|
2023-03-07 14:02:53 +00:00
|
|
|
logger.debug(
|
|
|
|
"shutting down worker %s for device %s", worker.pid, device
|
|
|
|
)
|
2023-03-06 00:58:13 +00:00
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
2023-03-06 01:23:23 +00:00
|
|
|
logger.error(
|
2023-03-07 14:02:53 +00:00
|
|
|
"leaking worker %s for device %s could not be shut down",
|
|
|
|
worker.pid,
|
|
|
|
device,
|
2023-03-06 01:23:23 +00:00
|
|
|
)
|
2023-03-06 00:58:13 +00:00
|
|
|
|
|
|
|
self.leaking[:] = [dw for dw in self.leaking if dw[1].is_alive()]
|
|
|
|
|
2023-03-06 02:13:28 +00:00
|
|
|
def recycle(self):
|
|
|
|
logger.debug("recycling worker pool")
|
2023-03-25 14:47:51 +00:00
|
|
|
|
|
|
|
with self.rlock:
|
|
|
|
self.join_leaking()
|
|
|
|
|
|
|
|
needs_restart = []
|
|
|
|
|
|
|
|
for device, worker in self.workers.items():
|
|
|
|
jobs = self.total_jobs.get(device, 0)
|
|
|
|
if not worker.is_alive():
|
|
|
|
logger.warning("worker for device %s has died", device)
|
|
|
|
needs_restart.append(device)
|
|
|
|
elif jobs > self.max_jobs_per_worker:
|
|
|
|
logger.info(
|
|
|
|
"shutting down worker for device %s after %s jobs", device, jobs
|
|
|
|
)
|
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
|
|
|
logger.warning(
|
|
|
|
"worker %s for device %s could not be recycled in time",
|
|
|
|
worker.pid,
|
|
|
|
device,
|
|
|
|
)
|
|
|
|
self.leaking.append((device, worker))
|
|
|
|
else:
|
|
|
|
del worker
|
|
|
|
|
|
|
|
self.workers[device] = None
|
|
|
|
needs_restart.append(device)
|
|
|
|
else:
|
|
|
|
logger.debug(
|
|
|
|
"worker %s for device %s does not need to be recycled",
|
2023-03-07 14:02:53 +00:00
|
|
|
worker.pid,
|
|
|
|
device,
|
2023-03-02 01:09:18 +00:00
|
|
|
)
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
if len(needs_restart) > 0:
|
|
|
|
logger.info("starting new workers")
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
for device in self.devices:
|
|
|
|
if device.device in needs_restart:
|
|
|
|
self.create_device_worker(device)
|
|
|
|
self.total_jobs[device.device] = 0
|
2023-03-02 01:09:18 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
if self.threads["logger"].is_alive():
|
|
|
|
logger.debug("logger worker is running")
|
2023-03-25 18:46:12 +00:00
|
|
|
if self.logs.full():
|
|
|
|
logger.warning("logger queue is full, restarting worker")
|
|
|
|
self.threads["logger"].join(self.join_timeout)
|
|
|
|
self.create_logger_worker()
|
2023-03-25 14:47:51 +00:00
|
|
|
else:
|
|
|
|
logger.warning("restarting logger worker")
|
|
|
|
self.create_logger_worker()
|
2023-03-23 00:58:46 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
if self.threads["progress"].is_alive():
|
|
|
|
logger.debug("progress worker is running")
|
2023-03-25 18:46:12 +00:00
|
|
|
if self.progress.full():
|
|
|
|
logger.warning("progress queue is full, restarting worker")
|
|
|
|
self.threads["progress"].join(self.join_timeout)
|
|
|
|
self.create_progress_worker()
|
2023-03-25 14:47:51 +00:00
|
|
|
else:
|
|
|
|
logger.warning("restarting progress worker")
|
|
|
|
self.create_progress_worker()
|
2023-03-23 00:58:46 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("worker pool recycled")
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
def submit(
|
|
|
|
self,
|
|
|
|
key: str,
|
|
|
|
fn: Callable[..., None],
|
|
|
|
/,
|
|
|
|
*args,
|
|
|
|
needs_device: Optional[DeviceParams] = None,
|
|
|
|
**kwargs,
|
|
|
|
) -> None:
|
|
|
|
device_idx = self.get_next_device(needs_device=needs_device)
|
|
|
|
logger.info(
|
2023-02-26 20:15:30 +00:00
|
|
|
"assigning job %s to device %s: %s",
|
|
|
|
key,
|
|
|
|
device_idx,
|
|
|
|
self.devices[device_idx],
|
2023-02-26 05:49:39 +00:00
|
|
|
)
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
# increment job count before recycling (why tho?)
|
2023-02-27 23:14:53 +00:00
|
|
|
device = self.devices[device_idx].device
|
2023-03-02 01:09:18 +00:00
|
|
|
if device in self.total_jobs:
|
|
|
|
self.total_jobs[device] += 1
|
|
|
|
else:
|
|
|
|
self.total_jobs[device] = 1
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
# recycle before attempting to run
|
2023-03-06 01:30:52 +00:00
|
|
|
logger.debug("job count for device %s: %s", device, self.total_jobs[device])
|
2023-03-02 01:09:18 +00:00
|
|
|
self.recycle()
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
# build and queue job
|
|
|
|
job = JobCommand(key, device, fn, args, kwargs)
|
|
|
|
self.pending_jobs.append(job)
|
|
|
|
self.pending[device].put(job, block=False)
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-18 22:25:13 +00:00
|
|
|
def status(self) -> List[Tuple[str, int, bool, bool, bool, bool]]:
|
2023-02-27 23:14:53 +00:00
|
|
|
history = [
|
2023-03-18 20:12:09 +00:00
|
|
|
(
|
|
|
|
name,
|
|
|
|
job.progress,
|
2023-03-18 22:25:13 +00:00
|
|
|
False,
|
2023-03-18 20:12:09 +00:00
|
|
|
job.finished,
|
2023-03-18 22:25:13 +00:00
|
|
|
job.cancelled,
|
|
|
|
job.failed,
|
2023-03-18 20:12:09 +00:00
|
|
|
)
|
2023-03-18 22:15:18 +00:00
|
|
|
for name, job in self.running_jobs.items()
|
2023-02-26 05:49:39 +00:00
|
|
|
]
|
2023-03-18 22:25:13 +00:00
|
|
|
history.extend(
|
|
|
|
[
|
|
|
|
(
|
|
|
|
job.name,
|
|
|
|
0,
|
|
|
|
True,
|
|
|
|
False,
|
|
|
|
False,
|
|
|
|
False,
|
2023-03-18 23:35:11 +00:00
|
|
|
)
|
|
|
|
for job in self.pending_jobs
|
2023-03-18 22:25:13 +00:00
|
|
|
]
|
|
|
|
)
|
2023-02-27 23:14:53 +00:00
|
|
|
history.extend(
|
2023-02-27 02:09:42 +00:00
|
|
|
[
|
|
|
|
(
|
2023-03-18 20:12:09 +00:00
|
|
|
job.job,
|
|
|
|
job.progress,
|
2023-03-18 22:25:13 +00:00
|
|
|
False,
|
2023-03-18 20:12:09 +00:00
|
|
|
job.finished,
|
2023-03-18 22:25:13 +00:00
|
|
|
job.cancelled,
|
|
|
|
job.failed,
|
2023-02-27 02:09:42 +00:00
|
|
|
)
|
2023-03-18 20:12:09 +00:00
|
|
|
for job in self.finished_jobs
|
2023-02-27 02:09:42 +00:00
|
|
|
]
|
|
|
|
)
|
2023-02-27 23:14:53 +00:00
|
|
|
return history
|
2023-03-18 22:15:18 +00:00
|
|
|
|
|
|
|
def update_job(self, progress: ProgressCommand):
|
|
|
|
if progress.finished:
|
|
|
|
# move from running to finished
|
|
|
|
logger.info("job has finished: %s", progress.job)
|
|
|
|
self.finished_jobs.append(progress)
|
2023-03-19 00:21:40 +00:00
|
|
|
if progress.job in self.running_jobs:
|
|
|
|
del self.running_jobs[progress.job]
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
self.join_leaking()
|
|
|
|
if progress.job in self.cancelled_jobs:
|
|
|
|
self.cancelled_jobs.remove(progress.job)
|
|
|
|
else:
|
|
|
|
# move from pending to running
|
|
|
|
logger.debug(
|
|
|
|
"progress update for job: %s to %s", progress.job, progress.progress
|
|
|
|
)
|
|
|
|
self.running_jobs[progress.job] = progress
|
2023-03-18 23:35:11 +00:00
|
|
|
self.pending_jobs[:] = [
|
|
|
|
job for job in self.pending_jobs if job.name != progress.job
|
|
|
|
]
|
2023-03-18 22:15:18 +00:00
|
|
|
|
|
|
|
if progress.job in self.cancelled_jobs:
|
|
|
|
logger.debug(
|
|
|
|
"setting flag for cancelled job: %s on %s",
|
|
|
|
progress.job,
|
|
|
|
progress.device,
|
|
|
|
)
|
|
|
|
self.context[progress.device].set_cancel()
|
2023-03-23 03:55:34 +00:00
|
|
|
|
2023-03-23 03:58:46 +00:00
|
|
|
|
2023-03-23 03:55:34 +00:00
|
|
|
def logger_main(pool: DevicePoolExecutor, logs: Queue):
|
|
|
|
logger.trace("checking in from logger worker thread")
|
|
|
|
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
job = logs.get(timeout=(pool.join_timeout / 2))
|
|
|
|
with open("worker.log", "w") as f:
|
|
|
|
logger.info("got log: %s", job)
|
|
|
|
f.write(str(job) + "\n\n")
|
|
|
|
except Empty:
|
|
|
|
pass
|
|
|
|
except ValueError:
|
|
|
|
break
|
|
|
|
except Exception:
|
|
|
|
logger.exception("error in log worker")
|
|
|
|
|
2023-03-23 03:58:46 +00:00
|
|
|
|
2023-03-23 03:55:34 +00:00
|
|
|
def progress_main(pool: DevicePoolExecutor, queue: "Queue[ProgressCommand]"):
|
|
|
|
logger.trace("checking in from progress worker thread")
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
progress = queue.get(timeout=(pool.join_timeout / 2))
|
|
|
|
pool.update_job(progress)
|
|
|
|
except Empty:
|
|
|
|
pass
|
|
|
|
except ValueError:
|
|
|
|
break
|
|
|
|
except Exception:
|
|
|
|
logger.exception("error in progress worker")
|