fix(api): track and repeatedly attempt to recycle leaking workers (#219)
This commit is contained in:
parent
ccd15c1921
commit
7a3a81a4ef
|
@ -21,6 +21,7 @@ class DevicePoolExecutor:
|
||||||
max_pending_per_worker: int
|
max_pending_per_worker: int
|
||||||
join_timeout: float
|
join_timeout: float
|
||||||
|
|
||||||
|
leaking: List[Tuple[str, Process]]
|
||||||
context: Dict[str, WorkerContext] # Device -> Context
|
context: Dict[str, WorkerContext] # Device -> Context
|
||||||
pending: Dict[str, "Queue[Tuple[str, Callable[..., None], Any, Any]]"]
|
pending: Dict[str, "Queue[Tuple[str, Callable[..., None], Any, Any]]"]
|
||||||
threads: Dict[str, Thread]
|
threads: Dict[str, Thread]
|
||||||
|
@ -49,6 +50,7 @@ class DevicePoolExecutor:
|
||||||
self.max_pending_per_worker = max_pending_per_worker
|
self.max_pending_per_worker = max_pending_per_worker
|
||||||
self.join_timeout = join_timeout
|
self.join_timeout = join_timeout
|
||||||
|
|
||||||
|
self.leaking = []
|
||||||
self.context = {}
|
self.context = {}
|
||||||
self.pending = {}
|
self.pending = {}
|
||||||
self.threads = {}
|
self.threads = {}
|
||||||
|
@ -133,7 +135,7 @@ class DevicePoolExecutor:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
job, device, value = progress.get(timeout=(self.join_timeout / 2))
|
job, device, value = progress.get(timeout=(self.join_timeout / 2))
|
||||||
logger.info("progress update for job: %s to %s", job, value)
|
logger.debug("progress update for job: %s to %s", job, value)
|
||||||
self.active_jobs[job] = (device, value)
|
self.active_jobs[job] = (device, value)
|
||||||
if job in self.cancelled_jobs:
|
if job in self.cancelled_jobs:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
@ -270,6 +272,7 @@ class DevicePoolExecutor:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"worker for device %s could not be stopped in time", device
|
"worker for device %s could not be stopped in time", device
|
||||||
)
|
)
|
||||||
|
self.leaking.append((device, worker))
|
||||||
else:
|
else:
|
||||||
logger.debug("worker for device %s has died", device)
|
logger.debug("worker for device %s has died", device)
|
||||||
|
|
||||||
|
@ -281,25 +284,38 @@ class DevicePoolExecutor:
|
||||||
|
|
||||||
def recycle(self):
|
def recycle(self):
|
||||||
logger.debug("recycling worker pool")
|
logger.debug("recycling worker pool")
|
||||||
|
|
||||||
|
if len(self.leaking) > 0:
|
||||||
|
logger.warning("cleaning up %s leaking workers", len(self.leaking))
|
||||||
|
for device, worker in self.leaking:
|
||||||
|
logger.debug("shutting down worker for device %s", device)
|
||||||
|
worker.join(self.join_timeout)
|
||||||
|
if worker.is_alive():
|
||||||
|
logger.error("leaking worker for device %s could not be shut down", device)
|
||||||
|
|
||||||
|
self.leaking[:] = [dw for dw in self.leaking if dw[1].is_alive()]
|
||||||
|
|
||||||
needs_restart = []
|
needs_restart = []
|
||||||
|
|
||||||
for device, proc in self.workers.items():
|
for device, worker in self.workers.items():
|
||||||
jobs = self.total_jobs.get(device, 0)
|
jobs = self.total_jobs.get(device, 0)
|
||||||
if not proc.is_alive():
|
if not worker.is_alive():
|
||||||
logger.warning("worker for device %s has died", device)
|
logger.warning("worker for device %s has died", device)
|
||||||
needs_restart.append(device)
|
needs_restart.append(device)
|
||||||
elif jobs > self.max_jobs_per_worker:
|
elif jobs > self.max_jobs_per_worker:
|
||||||
logger.info(
|
logger.info(
|
||||||
"shutting down worker for device %s after %s jobs", device, jobs
|
"shutting down worker for device %s after %s jobs", device, jobs
|
||||||
)
|
)
|
||||||
proc.join(self.join_timeout)
|
worker.join(self.join_timeout)
|
||||||
if proc.is_alive():
|
if worker.is_alive():
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"worker for device %s could not be recycled in time", device
|
"worker for device %s could not be recycled in time", device
|
||||||
)
|
)
|
||||||
|
self.leaking.append((device, worker))
|
||||||
|
else:
|
||||||
|
del worker
|
||||||
|
|
||||||
self.workers[device] = None
|
self.workers[device] = None
|
||||||
del proc
|
|
||||||
needs_restart.append(device)
|
needs_restart.append(device)
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
Loading…
Reference in New Issue