1
0
Fork 0
onnx-web/api/onnx_web/worker/worker.py

102 lines
3.1 KiB
Python
Raw Normal View History

2023-02-26 05:49:39 +00:00
from logging import getLogger
2023-03-06 03:37:39 +00:00
from os import getpid
2023-03-07 14:02:53 +00:00
from queue import Empty
2023-02-28 04:52:43 +00:00
from sys import exit
2023-02-26 20:15:30 +00:00
2023-02-26 19:09:24 +00:00
from setproctitle import setproctitle
2023-02-26 05:49:39 +00:00
2024-01-09 04:14:32 +00:00
from ..errors import CancelledException, RetryException
2023-02-26 18:32:48 +00:00
from ..server import ServerContext, apply_patches
2023-02-26 21:21:58 +00:00
from ..torch_before_ort import get_available_providers
2023-02-26 20:15:30 +00:00
from .context import WorkerContext
2023-02-26 05:49:39 +00:00
logger = getLogger(__name__)
2023-03-06 03:37:39 +00:00
EXIT_ERROR = 1
EXIT_INTERRUPT = 0
EXIT_MEMORY = 2
EXIT_REPLACED = 3
EXIT_SUCCESS = 0
MEMORY_ERRORS = [
"Failed to allocate memory",
"hipErrorOutOfMemory",
2023-04-01 20:46:52 +00:00
"MIOPEN failure",
"out of memory",
"rocblas_status_memory_error",
]
2023-02-26 18:32:48 +00:00
def worker_main(
worker: WorkerContext, server: ServerContext, *args, exit=exit, patch=True
):
setproctitle("onnx-web worker: %s" % (worker.device.device))
2023-02-26 18:32:48 +00:00
if patch:
apply_patches(server)
logger.trace(
"checking in from worker with providers: %s", get_available_providers()
)
# make leaking workers easier to recycle
worker.progress.cancel_join_thread()
2023-02-26 05:49:39 +00:00
while True:
2023-02-27 02:09:42 +00:00
try:
if not worker.is_active():
2023-03-07 14:02:53 +00:00
logger.warning(
"worker %s has been replaced by %s, exiting",
getpid(),
worker.get_active(),
2023-03-07 14:02:53 +00:00
)
2023-11-18 23:20:45 +00:00
return exit(EXIT_REPLACED)
# wait briefly for the next job
job = worker.pending.get(timeout=worker.timeout)
logger.info("worker %s got job: %s", worker.device.device, job.name)
2023-02-28 04:45:29 +00:00
# clear flags and save the job name
worker.start(job)
logger.info("starting job: %s", job.name)
# reset progress, which does a final check for cancellation
worker.set_progress(0)
job.fn(worker, *job.args, **job.kwargs)
# confirm completion of the job
logger.info("job succeeded: %s", job.name)
worker.finish()
2023-02-28 04:37:43 +00:00
except Empty:
logger.trace("worker reached end of queue, setting idle flag")
worker.set_idle()
2023-02-28 04:52:43 +00:00
except KeyboardInterrupt:
logger.debug("worker got keyboard interrupt")
worker.fail()
2023-11-18 23:20:45 +00:00
return exit(EXIT_INTERRUPT)
2023-07-15 22:05:27 +00:00
except RetryException:
logger.exception("retry error in worker, exiting")
2023-07-15 22:05:27 +00:00
worker.fail()
2023-11-18 23:20:45 +00:00
return exit(EXIT_ERROR)
except ValueError:
logger.exception("value error in worker, exiting")
worker.fail()
2023-11-18 23:20:45 +00:00
return exit(EXIT_ERROR)
2024-01-09 04:14:32 +00:00
except CancelledException as e:
logger.warning("job was cancelled, continuing")
worker.fail(e.reason or "cancelled")
2023-02-26 17:16:33 +00:00
except Exception as e:
e_str = str(e)
2023-04-01 22:14:56 +00:00
# restart the worker on memory errors
for e_mem in MEMORY_ERRORS:
if e_mem in e_str:
logger.error("detected out-of-memory error, exiting: %s", e)
2024-01-09 04:14:32 +00:00
worker.fail("oom")
2023-11-18 23:20:45 +00:00
return exit(EXIT_MEMORY)
2023-04-01 22:14:56 +00:00
# carry on for other errors
logger.exception(
"unrecognized error while running job",
)
worker.fail()