2023-02-26 05:49:39 +00:00
|
|
|
from collections import Counter
|
|
|
|
from logging import getLogger
|
2023-02-28 04:45:29 +00:00
|
|
|
from queue import Empty
|
2023-03-26 13:30:34 +00:00
|
|
|
from threading import Lock, Thread
|
2023-03-18 20:16:41 +00:00
|
|
|
from typing import Callable, Dict, List, Optional, Tuple
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-27 02:09:42 +00:00
|
|
|
from torch.multiprocessing import Process, Queue, Value
|
2023-02-26 20:15:30 +00:00
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
from ..params import DeviceParams
|
2023-02-26 18:32:48 +00:00
|
|
|
from ..server import ServerContext
|
2023-03-18 20:12:09 +00:00
|
|
|
from .command import JobCommand, ProgressCommand
|
2023-02-26 05:49:39 +00:00
|
|
|
from .context import WorkerContext
|
2023-03-26 16:09:13 +00:00
|
|
|
from .utils import Interval
|
2023-02-27 23:14:53 +00:00
|
|
|
from .worker import worker_main
|
2023-02-26 05:49:39 +00:00
|
|
|
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class DevicePoolExecutor:
|
2023-03-01 03:44:52 +00:00
|
|
|
server: ServerContext
|
|
|
|
devices: List[DeviceParams]
|
2023-03-26 16:09:13 +00:00
|
|
|
|
|
|
|
join_timeout: float
|
2023-03-01 03:44:52 +00:00
|
|
|
max_jobs_per_worker: int
|
|
|
|
max_pending_per_worker: int
|
2023-03-26 16:09:13 +00:00
|
|
|
progress_interval: float
|
|
|
|
recycle_interval: float
|
2023-03-01 03:44:52 +00:00
|
|
|
|
2023-03-26 20:31:16 +00:00
|
|
|
leaking: List[Tuple[str, Process, WorkerContext]]
|
2023-04-20 22:36:29 +00:00
|
|
|
|
|
|
|
worker_cancel: Dict[str, "Value[bool]"]
|
|
|
|
worker_idle: Dict[str, "Value[bool]"]
|
|
|
|
|
2023-03-18 23:35:11 +00:00
|
|
|
context: Dict[str, WorkerContext] # Device -> Context
|
|
|
|
current: Dict[str, "Value[int]"] # Device -> pid
|
2023-03-18 20:12:09 +00:00
|
|
|
pending: Dict[str, "Queue[JobCommand]"]
|
2023-03-26 16:09:13 +00:00
|
|
|
progress: Dict[str, "Queue[ProgressCommand]"]
|
2023-03-01 03:44:52 +00:00
|
|
|
workers: Dict[str, Process]
|
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
health_worker: Interval
|
|
|
|
logger_worker: Thread
|
|
|
|
progress_worker: Interval
|
|
|
|
|
2023-03-01 03:44:52 +00:00
|
|
|
cancelled_jobs: List[str]
|
2023-03-18 20:12:09 +00:00
|
|
|
finished_jobs: List[ProgressCommand]
|
2023-03-18 22:15:18 +00:00
|
|
|
pending_jobs: List[JobCommand]
|
|
|
|
running_jobs: Dict[str, ProgressCommand] # Device -> job progress
|
2023-03-02 01:09:18 +00:00
|
|
|
total_jobs: Dict[str, int] # Device -> job count
|
2023-03-01 03:44:52 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
logs: "Queue[str]"
|
2023-03-26 16:09:13 +00:00
|
|
|
rlock: Lock
|
2023-02-26 05:49:39 +00:00
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
2023-02-26 18:32:48 +00:00
|
|
|
server: ServerContext,
|
2023-02-26 05:49:39 +00:00
|
|
|
devices: List[DeviceParams],
|
2023-03-01 03:44:52 +00:00
|
|
|
max_pending_per_worker: int = 100,
|
2023-04-20 22:36:29 +00:00
|
|
|
join_timeout: float = 5.0,
|
2023-03-26 16:09:13 +00:00
|
|
|
recycle_interval: float = 10,
|
|
|
|
progress_interval: float = 1.0,
|
2023-02-26 05:49:39 +00:00
|
|
|
):
|
2023-02-26 18:32:48 +00:00
|
|
|
self.server = server
|
2023-02-26 05:49:39 +00:00
|
|
|
self.devices = devices
|
2023-03-26 16:09:13 +00:00
|
|
|
|
|
|
|
self.join_timeout = join_timeout
|
2023-03-18 18:40:15 +00:00
|
|
|
self.max_jobs_per_worker = server.job_limit
|
2023-03-01 03:44:52 +00:00
|
|
|
self.max_pending_per_worker = max_pending_per_worker
|
2023-03-26 16:09:13 +00:00
|
|
|
self.progress_interval = progress_interval
|
|
|
|
self.recycle_interval = recycle_interval
|
2023-02-26 21:06:40 +00:00
|
|
|
|
2023-03-06 00:58:13 +00:00
|
|
|
self.leaking = []
|
2023-02-26 18:24:51 +00:00
|
|
|
self.context = {}
|
2023-03-06 03:28:21 +00:00
|
|
|
self.current = {}
|
2023-02-26 05:49:39 +00:00
|
|
|
self.pending = {}
|
2023-03-26 16:09:13 +00:00
|
|
|
self.progress = {}
|
2023-02-26 05:49:39 +00:00
|
|
|
self.workers = {}
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
self.cancelled_jobs = []
|
2023-02-27 02:09:42 +00:00
|
|
|
self.finished_jobs = []
|
2023-03-18 22:26:28 +00:00
|
|
|
self.pending_jobs = []
|
|
|
|
self.running_jobs = {}
|
2023-03-02 01:09:18 +00:00
|
|
|
self.total_jobs = {}
|
2023-04-20 22:36:29 +00:00
|
|
|
self.worker_cancel = {}
|
|
|
|
self.worker_idle = {}
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-01 03:44:52 +00:00
|
|
|
self.logs = Queue(self.max_pending_per_worker)
|
2023-03-26 16:09:13 +00:00
|
|
|
self.rlock = Lock()
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-09-28 23:45:04 +00:00
|
|
|
def start(self, *args) -> None:
|
2023-03-26 16:09:13 +00:00
|
|
|
self.create_health_worker()
|
2023-02-26 21:06:40 +00:00
|
|
|
self.create_logger_worker()
|
2023-02-27 23:14:53 +00:00
|
|
|
self.create_progress_worker()
|
|
|
|
|
2023-03-26 16:30:07 +00:00
|
|
|
for device in self.devices:
|
2023-09-28 23:45:04 +00:00
|
|
|
self.create_device_worker(device, *args)
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-09-28 23:45:04 +00:00
|
|
|
def create_device_worker(self, device: DeviceParams, *args) -> None:
|
2023-02-26 21:06:40 +00:00
|
|
|
name = device.device
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
# always recreate queues
|
|
|
|
self.progress[name] = Queue(self.max_pending_per_worker)
|
|
|
|
self.pending[name] = Queue(self.max_pending_per_worker)
|
2023-03-26 16:22:03 +00:00
|
|
|
self.total_jobs[device.device] = 0
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-26 16:22:03 +00:00
|
|
|
# reuse pid sentinel
|
2023-03-06 03:28:21 +00:00
|
|
|
if name in self.current:
|
|
|
|
logger.debug("using existing current worker value")
|
|
|
|
current = self.current[name]
|
|
|
|
else:
|
|
|
|
logger.debug("creating new current worker value")
|
|
|
|
current = Value("L", 0)
|
|
|
|
self.current[name] = current
|
|
|
|
|
2023-04-20 22:36:29 +00:00
|
|
|
self.worker_cancel[name] = Value("B", False)
|
|
|
|
self.worker_idle[name] = Value("B", False)
|
|
|
|
|
2023-03-26 16:22:03 +00:00
|
|
|
# create a new context and worker
|
2023-02-26 21:06:40 +00:00
|
|
|
context = WorkerContext(
|
|
|
|
name,
|
|
|
|
device,
|
2023-04-20 22:36:29 +00:00
|
|
|
cancel=self.worker_cancel[name],
|
2023-03-26 16:09:13 +00:00
|
|
|
progress=self.progress[name],
|
2023-02-27 23:14:53 +00:00
|
|
|
logs=self.logs,
|
2023-03-26 16:09:13 +00:00
|
|
|
pending=self.pending[name],
|
2023-03-18 20:32:49 +00:00
|
|
|
active_pid=current,
|
2023-04-20 22:36:29 +00:00
|
|
|
idle=self.worker_idle[name],
|
2023-09-11 23:41:59 +00:00
|
|
|
retries=self.server.worker_retries,
|
2023-09-28 23:45:04 +00:00
|
|
|
timeout=self.progress_interval,
|
2023-02-26 21:06:40 +00:00
|
|
|
)
|
|
|
|
self.context[name] = context
|
2023-03-26 16:22:03 +00:00
|
|
|
|
2023-03-06 03:28:21 +00:00
|
|
|
worker = Process(
|
2023-03-01 03:44:52 +00:00
|
|
|
name=f"onnx-web worker: {name}",
|
2023-03-23 03:58:46 +00:00
|
|
|
target=worker_main,
|
2023-09-28 23:45:04 +00:00
|
|
|
args=(context, self.server, *args),
|
|
|
|
daemon=True,
|
2023-03-01 03:44:52 +00:00
|
|
|
)
|
2023-03-26 16:22:03 +00:00
|
|
|
self.workers[name] = worker
|
2023-02-26 21:06:40 +00:00
|
|
|
|
|
|
|
logger.debug("starting worker for device %s", device)
|
2023-03-06 03:28:21 +00:00
|
|
|
worker.start()
|
|
|
|
current.value = worker.pid
|
2023-02-26 21:06:40 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
def create_health_worker(self) -> None:
|
|
|
|
self.health_worker = Interval(self.recycle_interval, health_main, args=(self,))
|
|
|
|
self.health_worker.daemon = True
|
|
|
|
self.health_worker.name = "onnx-web health"
|
|
|
|
logger.debug("starting health worker")
|
|
|
|
self.health_worker.start()
|
|
|
|
|
2023-02-27 23:14:53 +00:00
|
|
|
def create_logger_worker(self) -> None:
|
2023-03-26 16:09:13 +00:00
|
|
|
self.logger_worker = Thread(
|
2023-03-23 03:58:46 +00:00
|
|
|
name="onnx-web logger",
|
|
|
|
target=logger_main,
|
|
|
|
args=(
|
|
|
|
self,
|
|
|
|
self.logs,
|
|
|
|
),
|
|
|
|
daemon=True,
|
2023-03-01 03:44:52 +00:00
|
|
|
)
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
logger.debug("starting logger worker")
|
2023-03-26 16:09:13 +00:00
|
|
|
self.logger_worker.start()
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
def create_progress_worker(self) -> None:
|
2023-03-26 16:09:13 +00:00
|
|
|
self.progress_worker = Interval(
|
2023-03-26 16:48:27 +00:00
|
|
|
self.progress_interval, progress_main, args=(self,)
|
2023-03-01 03:44:52 +00:00
|
|
|
)
|
2023-03-26 16:09:13 +00:00
|
|
|
self.progress_worker.daemon = True
|
|
|
|
self.progress_worker.name = "onnx-web progress"
|
2023-02-27 23:14:53 +00:00
|
|
|
logger.debug("starting progress worker")
|
2023-03-26 16:09:13 +00:00
|
|
|
self.progress_worker.start()
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-02-27 02:09:42 +00:00
|
|
|
def get_job_context(self, key: str) -> WorkerContext:
|
2023-03-18 22:15:18 +00:00
|
|
|
device, _progress = self.running_jobs[key]
|
2023-02-27 02:09:42 +00:00
|
|
|
return self.context[device]
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-27 23:14:53 +00:00
|
|
|
def get_next_device(self, needs_device: Optional[DeviceParams] = None) -> int:
|
|
|
|
# respect overrides if possible
|
|
|
|
if needs_device is not None:
|
|
|
|
for i in range(len(self.devices)):
|
|
|
|
if self.devices[i].device == needs_device.device:
|
|
|
|
return i
|
|
|
|
|
|
|
|
jobs = Counter(range(len(self.devices)))
|
|
|
|
jobs.update([self.pending[d.device].qsize() for d in self.devices])
|
|
|
|
|
|
|
|
queued = jobs.most_common()
|
2023-03-17 01:22:20 +00:00
|
|
|
logger.trace("jobs queued by device: %s", queued)
|
2023-02-27 23:14:53 +00:00
|
|
|
|
|
|
|
lowest_count = queued[-1][1]
|
|
|
|
lowest_devices = [d[0] for d in queued if d[1] == lowest_count]
|
|
|
|
lowest_devices.sort()
|
|
|
|
|
|
|
|
return lowest_devices[0]
|
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
def cancel(self, key: str) -> bool:
|
|
|
|
"""
|
|
|
|
Cancel a job. If the job has not been started, this will cancel
|
|
|
|
the future and never execute it. If the job has been started, it
|
|
|
|
should be cancelled on the next progress callback.
|
|
|
|
"""
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
for job in self.finished_jobs:
|
|
|
|
if job.job == key:
|
|
|
|
logger.debug("cannot cancel finished job: %s", key)
|
|
|
|
return False
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
for job in self.pending_jobs:
|
|
|
|
if job.name == key:
|
2023-03-26 19:22:59 +00:00
|
|
|
self.pending_jobs.remove(job)
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.info("cancelled pending job: %s", key)
|
|
|
|
return True
|
|
|
|
|
|
|
|
if key not in self.running_jobs:
|
2023-03-18 20:12:09 +00:00
|
|
|
logger.debug("cancelled job is not active: %s", key)
|
|
|
|
else:
|
2023-03-18 22:15:18 +00:00
|
|
|
job = self.running_jobs[key]
|
2023-03-18 20:12:09 +00:00
|
|
|
logger.info("cancelling job %s, active on device %s", key, job.device)
|
2023-02-26 20:36:32 +00:00
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
self.cancelled_jobs.append(key)
|
2023-02-26 20:36:32 +00:00
|
|
|
return True
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
def done(self, key: str) -> Tuple[bool, Optional[ProgressCommand]]:
|
2023-02-27 23:14:53 +00:00
|
|
|
"""
|
|
|
|
Check if a job has been finished and report the last progress update.
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
If the job is still pending, the first item will be True and there will be no ProgressCommand.
|
2023-02-27 23:14:53 +00:00
|
|
|
"""
|
2023-03-18 22:15:18 +00:00
|
|
|
if key in self.running_jobs:
|
|
|
|
logger.debug("checking status for running job: %s", key)
|
|
|
|
return (False, self.running_jobs[key])
|
|
|
|
|
2023-03-18 20:12:09 +00:00
|
|
|
for job in self.finished_jobs:
|
|
|
|
if job.job == key:
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.debug("checking status for finished job: %s", key)
|
|
|
|
return (False, job)
|
2023-02-27 02:09:42 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
for job in self.pending_jobs:
|
|
|
|
if job.name == key:
|
|
|
|
logger.debug("checking status for pending job: %s", key)
|
2023-03-18 22:25:13 +00:00
|
|
|
return (True, None)
|
2023-02-26 18:51:11 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.trace("checking status for unknown job: %s", key)
|
|
|
|
return (False, None)
|
2023-02-26 05:49:39 +00:00
|
|
|
|
2023-02-26 16:47:31 +00:00
|
|
|
def join(self):
|
2023-03-01 03:44:52 +00:00
|
|
|
logger.info("stopping worker pool")
|
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
with self.rlock:
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("stopping device workers")
|
|
|
|
for device, worker in self.workers.items():
|
2023-03-02 01:09:18 +00:00
|
|
|
if worker.is_alive():
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("stopping worker %s for device %s", worker.pid, device)
|
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
|
|
|
logger.warning(
|
|
|
|
"worker %s for device %s could not be stopped in time",
|
|
|
|
worker.pid,
|
|
|
|
device,
|
|
|
|
)
|
2023-03-26 20:31:16 +00:00
|
|
|
self.leak_worker(device)
|
2023-03-25 14:47:51 +00:00
|
|
|
else:
|
|
|
|
logger.debug("worker for device %s has died", device)
|
2023-02-26 16:47:31 +00:00
|
|
|
|
2023-04-11 04:23:08 +00:00
|
|
|
logger.debug("stopping health worker")
|
|
|
|
self.health_worker.cancel()
|
|
|
|
self.health_worker.join(self.recycle_interval)
|
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
logger.debug("stopping progress worker")
|
2023-04-11 04:23:08 +00:00
|
|
|
self.progress_worker.cancel()
|
|
|
|
self.progress_worker.join(self.progress_interval)
|
2023-03-26 16:09:13 +00:00
|
|
|
|
|
|
|
logger.debug("stopping logger worker")
|
|
|
|
self.logger_worker.join(self.join_timeout)
|
|
|
|
|
2023-03-26 23:21:53 +00:00
|
|
|
logger.debug("closing worker queues")
|
|
|
|
self.logs.close()
|
|
|
|
|
|
|
|
for queue in self.pending.values():
|
|
|
|
queue.close()
|
2023-04-11 04:23:08 +00:00
|
|
|
|
2023-03-26 23:21:53 +00:00
|
|
|
for queue in self.progress.values():
|
|
|
|
queue.close()
|
|
|
|
|
|
|
|
self.pending.clear()
|
|
|
|
self.progress.clear()
|
|
|
|
|
|
|
|
logger.debug("stopping leaking workers")
|
|
|
|
self.join_leaking()
|
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("worker pool stopped")
|
2023-02-28 05:01:26 +00:00
|
|
|
|
2023-03-06 02:13:28 +00:00
|
|
|
def join_leaking(self):
|
2023-03-06 00:58:13 +00:00
|
|
|
if len(self.leaking) > 0:
|
2023-03-26 23:22:16 +00:00
|
|
|
for device, worker, context in self.leaking:
|
2023-04-20 22:36:29 +00:00
|
|
|
logger.debug(
|
2023-03-26 20:31:16 +00:00
|
|
|
"shutting down leaking worker %s for device %s", worker.pid, device
|
2023-03-07 14:02:53 +00:00
|
|
|
)
|
2023-03-06 00:58:13 +00:00
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
2023-04-20 22:36:29 +00:00
|
|
|
logger.warning(
|
2023-03-07 14:02:53 +00:00
|
|
|
"leaking worker %s for device %s could not be shut down",
|
|
|
|
worker.pid,
|
|
|
|
device,
|
2023-03-06 01:23:23 +00:00
|
|
|
)
|
2023-03-06 00:58:13 +00:00
|
|
|
|
2023-03-26 23:22:16 +00:00
|
|
|
try:
|
|
|
|
progress = context.progress.get_nowait()
|
|
|
|
while progress is not None:
|
|
|
|
self.update_job(progress)
|
|
|
|
progress = context.progress.get_nowait()
|
|
|
|
except Empty:
|
|
|
|
logger.trace("empty queue in leaking worker for device %s", device)
|
|
|
|
except ValueError as e:
|
2023-03-26 23:24:22 +00:00
|
|
|
logger.debug(
|
|
|
|
"value error in leaking worker for device %s: %s", device, e
|
|
|
|
)
|
2023-03-26 23:22:16 +00:00
|
|
|
except Exception:
|
|
|
|
logger.exception("error in leaking worker for device %s", device)
|
|
|
|
|
2023-03-06 00:58:13 +00:00
|
|
|
self.leaking[:] = [dw for dw in self.leaking if dw[1].is_alive()]
|
|
|
|
|
2023-04-20 22:36:29 +00:00
|
|
|
def recycle(self, recycle_all=False):
|
2023-03-06 02:13:28 +00:00
|
|
|
logger.debug("recycling worker pool")
|
2023-03-25 14:47:51 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
with self.rlock:
|
2023-03-25 14:47:51 +00:00
|
|
|
self.join_leaking()
|
|
|
|
|
|
|
|
needs_restart = []
|
|
|
|
|
|
|
|
for device, worker in self.workers.items():
|
|
|
|
jobs = self.total_jobs.get(device, 0)
|
|
|
|
if not worker.is_alive():
|
|
|
|
logger.warning("worker for device %s has died", device)
|
|
|
|
needs_restart.append(device)
|
2023-04-20 22:36:29 +00:00
|
|
|
elif recycle_all or jobs > self.max_jobs_per_worker:
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.info(
|
|
|
|
"shutting down worker for device %s after %s jobs", device, jobs
|
|
|
|
)
|
|
|
|
worker.join(self.join_timeout)
|
|
|
|
if worker.is_alive():
|
|
|
|
logger.warning(
|
2023-04-20 22:36:29 +00:00
|
|
|
"worker %s for device %s could not be shut down in time",
|
2023-03-25 14:47:51 +00:00
|
|
|
worker.pid,
|
|
|
|
device,
|
|
|
|
)
|
2023-03-26 20:31:16 +00:00
|
|
|
self.leak_worker(device)
|
2023-03-25 14:47:51 +00:00
|
|
|
else:
|
|
|
|
del worker
|
|
|
|
|
|
|
|
self.workers[device] = None
|
|
|
|
needs_restart.append(device)
|
|
|
|
else:
|
|
|
|
logger.debug(
|
2023-03-26 16:53:06 +00:00
|
|
|
"worker %s for device %s has run %s jobs and is still alive",
|
2023-03-07 14:02:53 +00:00
|
|
|
worker.pid,
|
|
|
|
device,
|
2023-03-26 16:53:06 +00:00
|
|
|
jobs,
|
2023-03-02 01:09:18 +00:00
|
|
|
)
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
if len(needs_restart) > 0:
|
|
|
|
logger.info("starting new workers")
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
for device in self.devices:
|
|
|
|
if device.device in needs_restart:
|
|
|
|
self.create_device_worker(device)
|
2023-03-02 01:09:18 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
if self.logger_worker.is_alive():
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("logger worker is running")
|
|
|
|
else:
|
|
|
|
logger.warning("restarting logger worker")
|
|
|
|
self.create_logger_worker()
|
2023-03-23 00:58:46 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
if self.progress_worker.is_alive():
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("progress worker is running")
|
|
|
|
else:
|
|
|
|
logger.warning("restarting progress worker")
|
|
|
|
self.create_progress_worker()
|
2023-03-23 00:58:46 +00:00
|
|
|
|
2023-03-25 14:47:51 +00:00
|
|
|
logger.debug("worker pool recycled")
|
2023-02-26 18:58:38 +00:00
|
|
|
|
2023-02-26 05:49:39 +00:00
|
|
|
def submit(
|
|
|
|
self,
|
|
|
|
key: str,
|
|
|
|
fn: Callable[..., None],
|
|
|
|
/,
|
|
|
|
*args,
|
|
|
|
needs_device: Optional[DeviceParams] = None,
|
|
|
|
**kwargs,
|
|
|
|
) -> None:
|
|
|
|
device_idx = self.get_next_device(needs_device=needs_device)
|
2023-03-26 16:18:27 +00:00
|
|
|
device = self.devices[device_idx].device
|
2023-02-26 05:49:39 +00:00
|
|
|
logger.info(
|
2023-02-26 20:15:30 +00:00
|
|
|
"assigning job %s to device %s: %s",
|
|
|
|
key,
|
|
|
|
device_idx,
|
2023-03-26 16:18:27 +00:00
|
|
|
device,
|
2023-02-26 05:49:39 +00:00
|
|
|
)
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
# build and queue job
|
|
|
|
job = JobCommand(key, device, fn, args, kwargs)
|
|
|
|
self.pending_jobs.append(job)
|
2023-02-27 23:14:53 +00:00
|
|
|
|
2023-03-26 16:57:00 +00:00
|
|
|
def status(self) -> Dict[str, List[Tuple[str, int, bool, bool, bool, bool]]]:
|
2023-03-26 20:49:43 +00:00
|
|
|
"""
|
|
|
|
Returns a tuple of: job/device, progress, progress, finished, cancelled, failed
|
|
|
|
"""
|
2023-03-26 16:57:00 +00:00
|
|
|
return {
|
|
|
|
"cancelled": [],
|
|
|
|
"finished": [
|
|
|
|
(
|
|
|
|
job.job,
|
|
|
|
job.progress,
|
|
|
|
False,
|
|
|
|
job.finished,
|
|
|
|
job.cancelled,
|
|
|
|
job.failed,
|
|
|
|
)
|
|
|
|
for job in self.finished_jobs
|
|
|
|
],
|
|
|
|
"pending": [
|
2023-03-18 22:25:13 +00:00
|
|
|
(
|
|
|
|
job.name,
|
|
|
|
0,
|
|
|
|
True,
|
|
|
|
False,
|
|
|
|
False,
|
|
|
|
False,
|
2023-03-18 23:35:11 +00:00
|
|
|
)
|
|
|
|
for job in self.pending_jobs
|
2023-03-26 16:57:00 +00:00
|
|
|
],
|
|
|
|
"running": [
|
2023-02-27 02:09:42 +00:00
|
|
|
(
|
2023-03-26 16:57:00 +00:00
|
|
|
name,
|
2023-03-18 20:12:09 +00:00
|
|
|
job.progress,
|
2023-03-18 22:25:13 +00:00
|
|
|
False,
|
2023-03-18 20:12:09 +00:00
|
|
|
job.finished,
|
2023-03-18 22:25:13 +00:00
|
|
|
job.cancelled,
|
|
|
|
job.failed,
|
2023-02-27 02:09:42 +00:00
|
|
|
)
|
2023-03-26 16:57:00 +00:00
|
|
|
for name, job in self.running_jobs.items()
|
|
|
|
],
|
2023-03-26 20:49:43 +00:00
|
|
|
"total": [
|
|
|
|
(
|
|
|
|
device,
|
|
|
|
total,
|
|
|
|
self.workers[device].is_alive(),
|
|
|
|
False,
|
|
|
|
False,
|
|
|
|
False,
|
|
|
|
)
|
|
|
|
for device, total in self.total_jobs.items()
|
|
|
|
],
|
2023-03-26 16:57:00 +00:00
|
|
|
}
|
2023-03-18 22:15:18 +00:00
|
|
|
|
2023-03-26 16:41:17 +00:00
|
|
|
def next_job(self, device: str):
|
|
|
|
for job in self.pending_jobs:
|
|
|
|
if job.device == device:
|
|
|
|
logger.debug("enqueuing job %s on device %s", job.name, device)
|
|
|
|
self.pending[device].put(job, block=False)
|
2023-03-26 19:22:59 +00:00
|
|
|
# job will be removed from pending queue when progress is updated
|
2023-03-26 16:41:17 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
logger.trace("no pending jobs for device %s", device)
|
|
|
|
|
2023-03-26 19:02:57 +00:00
|
|
|
def finish_job(self, progress: ProgressCommand):
|
|
|
|
# move from running to finished
|
|
|
|
logger.info("job has finished: %s", progress.job)
|
|
|
|
self.finished_jobs.append(progress)
|
|
|
|
if progress.job in self.running_jobs:
|
|
|
|
del self.running_jobs[progress.job]
|
|
|
|
|
|
|
|
self.join_leaking()
|
|
|
|
if progress.job in self.cancelled_jobs:
|
|
|
|
self.cancelled_jobs.remove(progress.job)
|
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
def update_job(self, progress: ProgressCommand):
|
|
|
|
if progress.finished:
|
2023-03-26 19:02:57 +00:00
|
|
|
return self.finish_job(progress)
|
2023-03-19 00:21:40 +00:00
|
|
|
|
2023-03-26 19:02:57 +00:00
|
|
|
# move from pending to running
|
|
|
|
logger.debug(
|
|
|
|
"progress update for job: %s to %s", progress.job, progress.progress
|
|
|
|
)
|
|
|
|
self.running_jobs[progress.job] = progress
|
|
|
|
self.pending_jobs[:] = [
|
|
|
|
job for job in self.pending_jobs if job.name != progress.job
|
|
|
|
]
|
|
|
|
|
|
|
|
# increment job counter if this is the start of a new job
|
|
|
|
if progress.progress == 0:
|
|
|
|
if progress.device in self.total_jobs:
|
|
|
|
self.total_jobs[progress.device] += 1
|
|
|
|
else:
|
|
|
|
self.total_jobs[progress.device] = 1
|
2023-03-26 16:41:17 +00:00
|
|
|
|
2023-03-18 22:15:18 +00:00
|
|
|
logger.debug(
|
2023-03-26 19:02:57 +00:00
|
|
|
"updating job count for device %s: %s",
|
|
|
|
progress.device,
|
|
|
|
self.total_jobs[progress.device],
|
2023-03-18 22:15:18 +00:00
|
|
|
)
|
2023-03-26 16:18:27 +00:00
|
|
|
|
2023-03-26 19:02:57 +00:00
|
|
|
# check if the job has been cancelled
|
|
|
|
if progress.job in self.cancelled_jobs:
|
|
|
|
logger.debug(
|
|
|
|
"setting flag for cancelled job: %s on %s",
|
|
|
|
progress.job,
|
|
|
|
progress.device,
|
|
|
|
)
|
|
|
|
self.context[progress.device].set_cancel()
|
2023-03-23 03:55:34 +00:00
|
|
|
|
2023-03-26 20:31:16 +00:00
|
|
|
def leak_worker(self, device: str):
|
|
|
|
context = self.context[device]
|
|
|
|
worker = self.workers[device]
|
|
|
|
self.leaking.append((device, worker, context))
|
|
|
|
|
2023-03-23 03:58:46 +00:00
|
|
|
|
2023-03-26 16:09:13 +00:00
|
|
|
def health_main(pool: DevicePoolExecutor):
|
|
|
|
logger.trace("checking in from health worker thread")
|
|
|
|
pool.recycle()
|
|
|
|
|
|
|
|
if pool.logs.full():
|
|
|
|
logger.warning("logger queue is full, restarting worker")
|
|
|
|
pool.logger_worker.join(pool.join_timeout)
|
|
|
|
pool.create_logger_worker()
|
|
|
|
|
|
|
|
if any([queue.full() for queue in pool.progress.values()]):
|
|
|
|
logger.warning("progress queue is full, restarting worker")
|
|
|
|
pool.progress_worker.join(pool.join_timeout)
|
|
|
|
pool.create_progress_worker()
|
|
|
|
|
|
|
|
|
|
|
|
def logger_main(pool: DevicePoolExecutor, logs: "Queue[str]"):
|
2023-03-23 03:55:34 +00:00
|
|
|
logger.trace("checking in from logger worker thread")
|
|
|
|
|
|
|
|
while True:
|
|
|
|
try:
|
2023-12-26 14:07:05 +00:00
|
|
|
msg = logs.get(timeout=(pool.join_timeout / 2))
|
2023-03-26 16:09:13 +00:00
|
|
|
logger.debug("received logs from worker: %s", msg)
|
2023-03-23 03:55:34 +00:00
|
|
|
except Empty:
|
2023-03-26 20:53:20 +00:00
|
|
|
# logger worker should not generate more logs if it doesn't have any logs
|
2023-03-23 03:55:34 +00:00
|
|
|
pass
|
|
|
|
except ValueError:
|
|
|
|
break
|
|
|
|
except Exception:
|
|
|
|
logger.exception("error in log worker")
|
|
|
|
|
2023-03-23 03:58:46 +00:00
|
|
|
|
2023-03-26 20:31:16 +00:00
|
|
|
def progress_main(pool: DevicePoolExecutor):
|
2023-03-23 03:55:34 +00:00
|
|
|
logger.trace("checking in from progress worker thread")
|
2023-03-26 20:31:16 +00:00
|
|
|
|
|
|
|
for device, _worker, context in pool.leaking:
|
|
|
|
# whether the worker is alive or not, try to clear its queues
|
|
|
|
try:
|
|
|
|
progress = context.progress.get_nowait()
|
|
|
|
while progress is not None:
|
|
|
|
pool.update_job(progress)
|
|
|
|
progress = context.progress.get_nowait()
|
|
|
|
except Empty:
|
|
|
|
logger.trace("empty queue in leaking worker for device %s", device)
|
|
|
|
except ValueError as e:
|
|
|
|
logger.debug("value error in leaking worker for device %s: %s", device, e)
|
|
|
|
break
|
|
|
|
except Exception:
|
|
|
|
logger.exception("error in leaking worker for device %s", device)
|
|
|
|
|
2023-03-26 16:48:27 +00:00
|
|
|
for device, queue in pool.progress.items():
|
2023-03-23 03:55:34 +00:00
|
|
|
try:
|
2023-03-26 16:09:13 +00:00
|
|
|
progress = queue.get_nowait()
|
|
|
|
while progress is not None:
|
|
|
|
pool.update_job(progress)
|
|
|
|
progress = queue.get_nowait()
|
2023-03-23 03:55:34 +00:00
|
|
|
except Empty:
|
2023-03-26 16:09:13 +00:00
|
|
|
logger.trace("empty queue in progress worker for device %s", device)
|
|
|
|
except ValueError as e:
|
|
|
|
logger.debug("value error in progress worker for device %s: %s", device, e)
|
2023-03-23 03:55:34 +00:00
|
|
|
break
|
|
|
|
except Exception:
|
2023-03-26 16:09:13 +00:00
|
|
|
logger.exception("error in progress worker for device %s", device)
|
2023-03-26 16:48:27 +00:00
|
|
|
|
2023-04-16 01:37:53 +00:00
|
|
|
for device, context in pool.context.items():
|
|
|
|
if context.is_idle():
|
2023-04-01 20:39:22 +00:00
|
|
|
logger.trace("enqueueing next job for idle worker")
|
2023-03-26 16:48:27 +00:00
|
|
|
pool.next_job(device)
|