1
0
Fork 0

recycle worker pool after 10 jobs

This commit is contained in:
Sean Sube 2023-02-26 12:58:38 -06:00
parent e0737e9e08
commit 6502e1e3c8
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
1 changed files with 27 additions and 0 deletions

View File

@ -36,6 +36,7 @@ class DevicePoolExecutor:
self.progress = {} self.progress = {}
self.workers = {} self.workers = {}
self.jobs = {} # Dict[Output, Device] self.jobs = {} # Dict[Output, Device]
self.job_count = 0
# TODO: make this a method # TODO: make this a method
logger.debug("starting log worker") logger.debug("starting log worker")
@ -129,6 +130,27 @@ class DevicePoolExecutor:
) )
self.finished[:] = self.finished[-self.finished_limit:] self.finished[:] = self.finished[-self.finished_limit:]
def recycle(self):
for name, proc in self.workers.items():
if proc.is_alive():
logger.debug("shutting down worker for device %s", name)
proc.join(5)
else:
logger.warning("worker for device %s has died", name)
self.workers[name] = None
logger.info("starting new workers")
for name in self.workers.keys():
context = self.context[name]
lock = self.locks[name]
logger.debug("starting worker for device %s", name)
self.workers[name] = Process(target=worker_init, args=(lock, context, self.server))
self.workers[name].start()
def submit( def submit(
self, self,
key: str, key: str,
@ -138,6 +160,11 @@ class DevicePoolExecutor:
needs_device: Optional[DeviceParams] = None, needs_device: Optional[DeviceParams] = None,
**kwargs, **kwargs,
) -> None: ) -> None:
self.job_count += 1
if self.job_count > 10:
self.recycle()
self.job_count = 0
self.prune() self.prune()
device_idx = self.get_next_device(needs_device=needs_device) device_idx = self.get_next_device(needs_device=needs_device)
logger.info( logger.info(