1
0
Fork 0
onnx-web/api/tests/worker/test_worker.py

210 lines
4.7 KiB
Python
Raw Permalink Normal View History

2023-11-05 01:42:11 +00:00
import unittest
from multiprocessing import Queue, Value
2023-11-18 23:20:45 +00:00
from os import getpid
2023-11-05 01:42:11 +00:00
2023-11-19 00:13:13 +00:00
from onnx_web.errors import RetryException
2023-11-05 01:42:11 +00:00
from onnx_web.server.context import ServerContext
2023-11-18 23:20:45 +00:00
from onnx_web.worker.command import JobCommand
2023-11-05 01:42:11 +00:00
from onnx_web.worker.context import WorkerContext
2023-11-19 00:13:13 +00:00
from onnx_web.worker.worker import (
EXIT_ERROR,
EXIT_INTERRUPT,
EXIT_MEMORY,
EXIT_REPLACED,
MEMORY_ERRORS,
worker_main,
)
2023-11-05 01:42:11 +00:00
from tests.helpers import test_device
2023-11-19 00:13:13 +00:00
2023-11-18 23:20:45 +00:00
def main_memory(_worker):
2023-11-25 21:28:03 +00:00
raise MemoryError(MEMORY_ERRORS[0])
2023-11-20 05:18:57 +00:00
2023-11-18 23:20:45 +00:00
def main_retry(_worker):
2023-11-20 05:18:57 +00:00
raise RetryException()
2023-11-18 23:20:45 +00:00
def main_interrupt(_worker):
2023-11-20 05:18:57 +00:00
raise KeyboardInterrupt()
2023-11-18 23:20:45 +00:00
2023-11-05 01:42:11 +00:00
class WorkerMainTests(unittest.TestCase):
2023-11-20 05:18:57 +00:00
def test_pending_exception_empty(self):
pass
def test_pending_exception_interrupt(self):
status = None
def exit(exit_status):
nonlocal status
status = exit_status
job = JobCommand("test", "test", "test", main_interrupt, [], {})
2023-11-20 05:18:57 +00:00
cancel = Value("L", False)
logs = Queue()
pending = Queue()
progress = Queue()
pid = Value("L", getpid())
idle = Value("L", False)
pending.put(job)
worker_main(
WorkerContext(
"test",
test_device(),
cancel,
logs,
pending,
progress,
pid,
idle,
0,
0.0,
),
ServerContext(),
exit=exit,
)
self.assertEqual(status, EXIT_INTERRUPT)
def test_pending_exception_retry(self):
status = None
def exit(exit_status):
nonlocal status
status = exit_status
job = JobCommand("test", "test", "test", main_retry, [], {})
2023-11-20 05:18:57 +00:00
cancel = Value("L", False)
logs = Queue()
pending = Queue()
progress = Queue()
pid = Value("L", getpid())
idle = Value("L", False)
pending.put(job)
worker_main(
WorkerContext(
"test",
test_device(),
cancel,
logs,
pending,
progress,
pid,
idle,
0,
0.0,
),
ServerContext(),
exit=exit,
)
self.assertEqual(status, EXIT_ERROR)
def test_pending_exception_value(self):
status = None
def exit(exit_status):
nonlocal status
status = exit_status
cancel = Value("L", False)
logs = Queue()
pending = Queue()
progress = Queue()
pid = Value("L", getpid())
idle = Value("L", False)
pending.close()
worker_main(
WorkerContext(
"test",
test_device(),
cancel,
logs,
pending,
progress,
pid,
idle,
0,
0.0,
),
ServerContext(),
exit=exit,
)
self.assertEqual(status, EXIT_ERROR)
def test_pending_exception_other_memory(self):
status = None
def exit(exit_status):
nonlocal status
status = exit_status
job = JobCommand("test", "test", "test", main_memory, [], {})
2023-11-20 05:18:57 +00:00
cancel = Value("L", False)
logs = Queue()
pending = Queue()
progress = Queue()
pid = Value("L", getpid())
idle = Value("L", False)
pending.put(job)
worker_main(
WorkerContext(
"test",
test_device(),
cancel,
logs,
pending,
progress,
pid,
idle,
0,
0.0,
),
ServerContext(),
exit=exit,
)
self.assertEqual(status, EXIT_MEMORY)
def test_pending_exception_other_unknown(self):
pass
def test_pending_replaced(self):
status = None
def exit(exit_status):
nonlocal status
status = exit_status
cancel = Value("L", False)
logs = Queue()
pending = Queue()
progress = Queue()
pid = Value("L", 0)
idle = Value("L", False)
worker_main(
WorkerContext(
"test",
test_device(),
cancel,
logs,
pending,
progress,
pid,
idle,
0,
0.0,
),
ServerContext(),
exit=exit,
)
self.assertEqual(status, EXIT_REPLACED)