1
0
Fork 0

lint(api): add start method to worker pool

This commit is contained in:
Sean Sube 2023-03-26 11:30:07 -05:00
parent 2b179bebac
commit f3ab25f671
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
2 changed files with 15 additions and 7 deletions

View File

@ -46,14 +46,16 @@ def main():
disable_progress_bar() disable_progress_bar()
disable_progress_bars() disable_progress_bars()
app = Flask(__name__) # create workers
CORS(app, origins=context.cors_origin) # any is a fake device and should not be in the pool
# any is a fake device, should not be in the pool
pool = DevicePoolExecutor( pool = DevicePoolExecutor(
context, [p for p in get_available_platforms() if p.device != "any"] context, [p for p in get_available_platforms() if p.device != "any"]
) )
# create server
app = Flask(__name__)
CORS(app, origins=context.cors_origin)
# register routes # register routes
register_static_routes(app, context, pool) register_static_routes(app, context, pool)
register_api_routes(app, context, pool) register_api_routes(app, context, pool)
@ -63,6 +65,7 @@ def main():
def run(): def run():
app, pool = main() app, pool = main()
pool.start()
def quit(): def quit():
logger.info("shutting down workers") logger.info("shutting down workers")
@ -74,6 +77,7 @@ def run():
if __name__ == "__main__": if __name__ == "__main__":
app, pool = main() app, pool = main()
pool.start()
app.run("0.0.0.0", 5000, debug=is_debug()) app.run("0.0.0.0", 5000, debug=is_debug())
logger.info("shutting down app") logger.info("shutting down app")
pool.join() pool.join()

View File

@ -80,12 +80,12 @@ class DevicePoolExecutor:
self.logs = Queue(self.max_pending_per_worker) self.logs = Queue(self.max_pending_per_worker)
self.rlock = Lock() self.rlock = Lock()
# TODO: these should be part of a start method def start(self) -> None:
self.create_health_worker() self.create_health_worker()
self.create_logger_worker() self.create_logger_worker()
self.create_progress_worker() self.create_progress_worker()
for device in devices: for device in self.devices:
self.create_device_worker(device) self.create_device_worker(device)
def create_device_worker(self, device: DeviceParams) -> None: def create_device_worker(self, device: DeviceParams) -> None:
@ -439,7 +439,11 @@ class DevicePoolExecutor:
else: else:
self.total_jobs[progress.device] = 1 self.total_jobs[progress.device] = 1
logger.debug("updating job count for device %s: %s", progress.device, self.total_jobs[progress.device]) logger.debug(
"updating job count for device %s: %s",
progress.device,
self.total_jobs[progress.device],
)
# check if the job has been cancelled # check if the job has been cancelled
if progress.job in self.cancelled_jobs: if progress.job in self.cancelled_jobs: