1
0
Fork 0

share retries throughout pipeline

This commit is contained in:
Sean Sube 2023-07-15 11:20:25 -05:00
parent d418cb1bc2
commit 7761aea28b
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
2 changed files with 10 additions and 3 deletions

View File

@ -150,7 +150,7 @@ class ChainPipeline:
tile_mask: Image.Image,
dims: Tuple[int, int, int],
) -> Image.Image:
for i in range(3):
for i in range(job.retries):
try:
output_tile = stage_pipe.run(
job,
@ -167,6 +167,7 @@ class ChainPipeline:
if is_debug():
save_image(server, "last-tile.png", output_tile)
job.retries = job.retries - i
return output_tile
except Exception:
logger.exception(
@ -188,9 +189,9 @@ class ChainPipeline:
stage_sources = stage_outputs
else:
logger.debug("image within tile size of %s, running stage", tile)
for i in range(3):
for i in range(job.retries):
try:
stage_sources = stage_pipe.run(
stage_outputs = stage_pipe.run(
job,
server,
stage_params,
@ -199,6 +200,10 @@ class ChainPipeline:
callback=callback,
**kwargs,
)
# doing this on the same line as stage_pipe.run can leave sources as None, which the pipeline
# does not like, so it throws
stage_sources = stage_outputs
job.retries = job.retries - i
break
except Exception:
logger.exception(

View File

@ -22,6 +22,7 @@ class WorkerContext:
last_progress: Optional[ProgressCommand]
idle: "Value[bool]"
timeout: float
retries: int
def __init__(
self,
@ -44,6 +45,7 @@ class WorkerContext:
self.last_progress = None
self.idle = idle
self.timeout = 1.0
self.retries = 3 # TODO: get from env
def start(self, job: str) -> None:
self.job = job