fix(api): close queues after stopping workers
This commit is contained in:
parent
dca8a97796
commit
e1219cca90
|
@ -235,16 +235,6 @@ class DevicePoolExecutor:
|
||||||
logger.info("stopping worker pool")
|
logger.info("stopping worker pool")
|
||||||
|
|
||||||
with self.rlock:
|
with self.rlock:
|
||||||
logger.debug("closing queues")
|
|
||||||
self.logs.close()
|
|
||||||
for queue in self.progress.values():
|
|
||||||
queue.close()
|
|
||||||
for queue in self.pending.values():
|
|
||||||
queue.close()
|
|
||||||
|
|
||||||
self.pending.clear()
|
|
||||||
self.join_leaking()
|
|
||||||
|
|
||||||
logger.debug("stopping device workers")
|
logger.debug("stopping device workers")
|
||||||
for device, worker in self.workers.items():
|
for device, worker in self.workers.items():
|
||||||
if worker.is_alive():
|
if worker.is_alive():
|
||||||
|
@ -269,6 +259,20 @@ class DevicePoolExecutor:
|
||||||
logger.debug("stopping health worker")
|
logger.debug("stopping health worker")
|
||||||
self.health_worker.join(self.join_timeout)
|
self.health_worker.join(self.join_timeout)
|
||||||
|
|
||||||
|
logger.debug("closing worker queues")
|
||||||
|
self.logs.close()
|
||||||
|
|
||||||
|
for queue in self.pending.values():
|
||||||
|
queue.close()
|
||||||
|
for queue in self.progress.values():
|
||||||
|
queue.close()
|
||||||
|
|
||||||
|
self.pending.clear()
|
||||||
|
self.progress.clear()
|
||||||
|
|
||||||
|
logger.debug("stopping leaking workers")
|
||||||
|
self.join_leaking()
|
||||||
|
|
||||||
logger.debug("worker pool stopped")
|
logger.debug("worker pool stopped")
|
||||||
|
|
||||||
def join_leaking(self):
|
def join_leaking(self):
|
||||||
|
|
Loading…
Reference in New Issue