1
0
Fork 0
onnx-web/api/onnx_web/worker/context.py

140 lines
3.6 KiB
Python
Raw Normal View History

2023-02-26 05:49:39 +00:00
from logging import getLogger
from os import getpid
from typing import Any, Callable
2023-02-26 05:49:39 +00:00
2023-02-26 20:15:30 +00:00
from torch.multiprocessing import Queue, Value
2023-02-26 05:49:39 +00:00
from ..params import DeviceParams
from .command import JobCommand, ProgressCommand
2023-02-26 05:49:39 +00:00
logger = getLogger(__name__)
ProgressCallback = Callable[[int, int, Any], None]
2023-02-26 20:15:30 +00:00
2023-02-26 05:49:39 +00:00
class WorkerContext:
cancel: "Value[bool]"
job: str
pending: "Queue[JobCommand]"
current: "Value[int]"
progress: "Queue[ProgressCommand]"
2023-02-26 05:49:39 +00:00
def __init__(
self,
2023-02-27 23:35:31 +00:00
job: str,
2023-02-26 05:49:39 +00:00
device: DeviceParams,
cancel: "Value[bool]",
logs: "Queue[str]",
pending: "Queue[JobCommand]",
progress: "Queue[ProgressCommand]",
current: "Value[int]",
2023-02-26 05:49:39 +00:00
):
2023-02-27 23:35:31 +00:00
self.job = job
2023-02-26 05:49:39 +00:00
self.device = device
2023-02-27 02:09:42 +00:00
self.cancel = cancel
2023-02-26 05:49:39 +00:00
self.progress = progress
2023-02-27 02:09:42 +00:00
self.logs = logs
self.pending = pending
self.current = current
2023-02-26 05:49:39 +00:00
def is_cancelled(self) -> bool:
return self.cancel.value
def is_current(self) -> bool:
2023-03-06 03:37:39 +00:00
return self.get_current() == getpid()
2023-03-06 03:37:39 +00:00
def get_current(self) -> int:
with self.current.get_lock():
return self.current.value
2023-02-26 05:49:39 +00:00
def get_device(self) -> DeviceParams:
"""
Get the device assigned to this job.
"""
return self.device
def get_progress(self) -> int:
return self.progress.value
def get_progress_callback(self) -> ProgressCallback:
def on_progress(step: int, timestep: int, latents: Any):
on_progress.step = step
self.set_progress(step)
2023-02-26 05:49:39 +00:00
return on_progress
def set_cancel(self, cancel: bool = True) -> None:
with self.cancel.get_lock():
self.cancel.value = cancel
def set_progress(self, progress: int) -> None:
if self.is_cancelled():
raise RuntimeError("job has been cancelled")
else:
logger.debug("setting progress for job %s to %s", self.job, progress)
self.progress.put(
ProgressCommand(
self.job,
self.device.device,
False,
progress,
self.is_cancelled(),
False,
),
block=False,
)
2023-02-27 02:09:42 +00:00
2023-02-27 02:37:22 +00:00
def set_finished(self) -> None:
logger.debug("setting finished for job %s", self.job)
self.progress.put(
ProgressCommand(
self.job,
self.device.device,
True,
self.get_progress(),
self.is_cancelled(),
False,
),
block=False,
)
def set_failed(self) -> None:
logger.warning("setting failure for job %s", self.job)
try:
self.progress.put(
ProgressCommand(
self.job,
self.device.device,
True,
self.get_progress(),
self.is_cancelled(),
True,
),
block=False,
)
except Exception:
logger.exception("error setting failure on job %s", self.job)
class JobStatus:
name: str
device: str
progress: int
cancelled: bool
finished: bool
def __init__(
self,
name: str,
device: DeviceParams,
progress: int = 0,
cancelled: bool = False,
finished: bool = False,
) -> None:
self.name = name
self.device = device.device
self.progress = progress
self.cancelled = cancelled
self.finished = finished