1
0
Fork 0

make logger start up well

This commit is contained in:
Sean Sube 2023-02-26 11:16:33 -06:00
parent 6998e8735c
commit d765a6f01b
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
2 changed files with 28 additions and 33 deletions

View File

@ -31,13 +31,14 @@ class DevicePoolExecutor:
self.progress = {} self.progress = {}
self.workers = {} self.workers = {}
log_queue = Queue()
logger_context = WorkerContext("logger", None, None, log_queue, None)
logger.debug("starting log worker") logger.debug("starting log worker")
self.logger = Process(target=logger_init, args=(self.lock, logger_context)) self.log_queue = Queue()
self.logger = Process(target=logger_init, args=(self.lock, self.log_queue))
self.logger.start() self.logger.start()
logger.debug("testing log worker")
self.log_queue.put("testing")
# create a pending queue and progress value for each device # create a pending queue and progress value for each device
for device in devices: for device in devices:
name = device.device name = device.device
@ -52,9 +53,6 @@ class DevicePoolExecutor:
self.workers[name] = Process(target=worker_init, args=(self.lock, context)) self.workers[name] = Process(target=worker_init, args=(self.lock, context))
self.workers[name].start() self.workers[name].start()
logger.debug("testing log worker")
log_queue.put("testing")
def cancel(self, key: str) -> bool: def cancel(self, key: str) -> bool:
""" """
Cancel a job. If the job has not been started, this will cancel Cancel a job. If the job has not been started, this will cancel
@ -99,6 +97,9 @@ class DevicePoolExecutor:
logger.info("stopping worker for device %s", device) logger.info("stopping worker for device %s", device)
worker.join(5) worker.join(5)
if self.logger.is_alive():
self.logger.join(5)
def prune(self): def prune(self):
finished_count = len(self.finished) finished_count = len(self.finished)
if finished_count > self.finished_limit: if finished_count > self.finished_limit:

View File

@ -1,40 +1,34 @@
from logging import getLogger from logging import getLogger
from torch.multiprocessing import Lock from onnxruntime import get_available_providers
from time import sleep from torch.multiprocessing import Lock, Queue
from traceback import print_exception from traceback import print_exception
from .context import WorkerContext from .context import WorkerContext
logger = getLogger(__name__) logger = getLogger(__name__)
def logger_init(lock: Lock, context: WorkerContext): def logger_init(lock: Lock, logs: Queue):
logger.info("checking in from logger") with lock:
logger.info("checking in from logger, %s", lock)
with open("worker.log", "w") as f: while True:
while True: job = logs.get()
if context.pending.empty(): with open("worker.log", "w") as f:
logger.info("no logs, sleeping") logger.info("got log: %s", job)
sleep(5) f.write(str(job) + "\n\n")
else:
job = context.pending.get()
logger.info("got log: %s", job)
f.write(str(job) + "\n\n")
def worker_init(lock: Lock, context: WorkerContext): def worker_init(lock: Lock, context: WorkerContext):
logger.info("checking in from worker") with lock:
logger.info("checking in from worker, %s, %s", lock, get_available_providers())
while True: while True:
if context.pending.empty(): job = context.pending.get()
logger.info("no jobs, sleeping") logger.info("got job: %s", job)
sleep(5) try:
else: fn, args, kwargs = job
job = context.pending.get() fn(context, *args, **kwargs)
logger.info("got job: %s", job) logger.info("finished job")
try: except Exception as e:
fn, args, kwargs = job print_exception(type(e), e, e.__traceback__)
fn(context, *args, **kwargs)
logger.info("finished job")
except Exception as e:
print_exception(type(e), e, e.__traceback__)