2023-02-02 14:31:35 +00:00
|
|
|
from hashlib import sha256
|
2023-02-02 04:20:40 +00:00
|
|
|
from json import dumps
|
2023-02-02 14:31:35 +00:00
|
|
|
from logging import getLogger
|
|
|
|
from time import time
|
2024-01-04 01:09:18 +00:00
|
|
|
from typing import List, Optional
|
2023-02-02 04:20:40 +00:00
|
|
|
|
2023-06-26 12:03:06 +00:00
|
|
|
from piexif import ExifIFD, ImageIFD, dump
|
|
|
|
from piexif.helper import UserComment
|
|
|
|
from PIL import Image, PngImagePlugin
|
2023-02-05 13:53:26 +00:00
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
from .chain.result import ImageMetadata, StageResult
|
|
|
|
from .params import ImageParams, Param, Size
|
2023-02-19 02:28:21 +00:00
|
|
|
from .server import ServerContext
|
2024-01-04 01:09:18 +00:00
|
|
|
from .utils import base_join, hash_value
|
2023-02-02 04:20:40 +00:00
|
|
|
|
2023-02-02 14:31:35 +00:00
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
2023-06-26 12:03:06 +00:00
|
|
|
|
2024-01-04 02:54:11 +00:00
|
|
|
def make_output_names(
|
2023-04-10 01:33:03 +00:00
|
|
|
server: ServerContext,
|
2024-01-04 02:54:11 +00:00
|
|
|
job_name: str,
|
|
|
|
count: int = 1,
|
2023-09-10 21:35:16 +00:00
|
|
|
offset: int = 0,
|
2023-02-20 14:35:18 +00:00
|
|
|
) -> List[str]:
|
2024-01-04 01:09:18 +00:00
|
|
|
return [
|
|
|
|
f"{job_name}_{i}.{server.image_format}" for i in range(offset, count + offset)
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
def make_job_name(
|
|
|
|
mode: str,
|
|
|
|
params: ImageParams,
|
|
|
|
size: Size,
|
|
|
|
extras: Optional[List[Optional[Param]]] = None,
|
|
|
|
) -> str:
|
2023-02-02 14:31:35 +00:00
|
|
|
now = int(time())
|
|
|
|
sha = sha256()
|
|
|
|
|
|
|
|
hash_value(sha, mode)
|
|
|
|
hash_value(sha, params.model)
|
2023-04-13 03:58:48 +00:00
|
|
|
hash_value(sha, params.pipeline)
|
2023-02-26 16:15:12 +00:00
|
|
|
hash_value(sha, params.scheduler)
|
2023-02-02 14:31:35 +00:00
|
|
|
hash_value(sha, params.prompt)
|
|
|
|
hash_value(sha, params.negative_prompt)
|
|
|
|
hash_value(sha, params.cfg)
|
|
|
|
hash_value(sha, params.seed)
|
2023-02-22 05:08:13 +00:00
|
|
|
hash_value(sha, params.steps)
|
|
|
|
hash_value(sha, params.eta)
|
|
|
|
hash_value(sha, params.batch)
|
2023-02-02 14:31:35 +00:00
|
|
|
hash_value(sha, size.width)
|
|
|
|
hash_value(sha, size.height)
|
|
|
|
|
|
|
|
if extras is not None:
|
|
|
|
for param in extras:
|
|
|
|
hash_value(sha, param)
|
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
return f"{mode}_{params.seed}_{sha.hexdigest()}_{now}"
|
|
|
|
|
|
|
|
|
|
|
|
def save_result(
|
|
|
|
server: ServerContext,
|
|
|
|
result: StageResult,
|
|
|
|
base_name: str,
|
|
|
|
) -> List[str]:
|
2024-01-06 02:11:58 +00:00
|
|
|
images = result.as_images()
|
2024-01-04 02:54:11 +00:00
|
|
|
outputs = make_output_names(server, base_name, len(images))
|
2024-01-06 20:17:26 +00:00
|
|
|
logger.debug("saving %s images: %s", len(images), outputs)
|
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
results = []
|
2024-01-04 02:54:11 +00:00
|
|
|
for image, metadata, filename in zip(images, result.metadata, outputs):
|
2024-01-04 01:09:18 +00:00
|
|
|
results.append(
|
|
|
|
save_image(
|
|
|
|
server,
|
2024-01-04 02:54:11 +00:00
|
|
|
filename,
|
2024-01-04 01:09:18 +00:00
|
|
|
image,
|
|
|
|
metadata,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
return results
|
2023-02-02 14:31:35 +00:00
|
|
|
|
|
|
|
|
2023-06-26 12:03:06 +00:00
|
|
|
def save_image(
|
|
|
|
server: ServerContext,
|
|
|
|
output: str,
|
|
|
|
image: Image.Image,
|
2024-01-13 00:58:26 +00:00
|
|
|
metadata: Optional[ImageMetadata] = None,
|
2023-06-26 12:03:06 +00:00
|
|
|
) -> str:
|
2023-04-10 01:33:03 +00:00
|
|
|
path = base_join(server.output_path, output)
|
2023-06-26 12:03:06 +00:00
|
|
|
|
|
|
|
if server.image_format == "png":
|
|
|
|
exif = PngImagePlugin.PngInfo()
|
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
if metadata is not None:
|
2023-08-02 22:43:17 +00:00
|
|
|
exif.add_text("make", "onnx-web")
|
2023-06-26 12:03:06 +00:00
|
|
|
exif.add_text(
|
2023-08-02 22:43:17 +00:00
|
|
|
"maker note",
|
2024-01-04 01:09:18 +00:00
|
|
|
dumps(metadata.tojson(server, [output])),
|
2023-06-26 12:03:06 +00:00
|
|
|
)
|
2023-08-02 22:43:17 +00:00
|
|
|
exif.add_text("model", server.server_version)
|
2023-06-26 12:48:39 +00:00
|
|
|
exif.add_text(
|
2023-08-02 22:43:17 +00:00
|
|
|
"parameters",
|
2024-01-14 19:19:09 +00:00
|
|
|
metadata.to_exif(server),
|
2023-06-26 12:48:39 +00:00
|
|
|
)
|
2023-06-26 12:03:06 +00:00
|
|
|
|
|
|
|
image.save(path, format=server.image_format, pnginfo=exif)
|
|
|
|
else:
|
|
|
|
exif = dump(
|
|
|
|
{
|
|
|
|
"0th": {
|
2023-06-26 12:14:32 +00:00
|
|
|
ExifIFD.MakerNote: UserComment.dump(
|
2024-01-04 01:09:18 +00:00
|
|
|
dumps(metadata.tojson(server, [output])),
|
2023-06-26 12:14:32 +00:00
|
|
|
encoding="unicode",
|
|
|
|
),
|
2023-06-26 12:24:25 +00:00
|
|
|
ExifIFD.UserComment: UserComment.dump(
|
2024-01-14 19:19:09 +00:00
|
|
|
metadata.to_exif(server),
|
2023-06-26 12:48:39 +00:00
|
|
|
encoding="unicode",
|
2023-06-26 12:24:25 +00:00
|
|
|
),
|
2023-06-26 12:03:06 +00:00
|
|
|
ImageIFD.Make: "onnx-web",
|
2023-06-26 22:24:34 +00:00
|
|
|
ImageIFD.Model: server.server_version,
|
2023-06-26 12:03:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
image.save(path, format=server.image_format, exif=exif)
|
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
if metadata is not None:
|
|
|
|
save_metadata(
|
2023-06-26 12:14:32 +00:00
|
|
|
server,
|
|
|
|
output,
|
2024-01-04 02:22:38 +00:00
|
|
|
metadata,
|
2023-06-26 12:14:32 +00:00
|
|
|
)
|
2023-06-26 12:03:06 +00:00
|
|
|
|
2023-02-05 13:53:26 +00:00
|
|
|
logger.debug("saved output image to: %s", path)
|
2023-02-02 04:20:40 +00:00
|
|
|
return path
|
|
|
|
|
|
|
|
|
2024-01-04 01:09:18 +00:00
|
|
|
def save_metadata(
|
2023-04-10 01:33:03 +00:00
|
|
|
server: ServerContext,
|
2023-02-02 04:20:40 +00:00
|
|
|
output: str,
|
2024-01-04 01:09:18 +00:00
|
|
|
metadata: ImageMetadata,
|
2023-02-02 04:20:40 +00:00
|
|
|
) -> str:
|
2023-04-10 01:33:03 +00:00
|
|
|
path = base_join(server.output_path, f"{output}.json")
|
2024-01-04 01:09:18 +00:00
|
|
|
json = metadata.tojson(server, [output])
|
2023-02-05 13:53:26 +00:00
|
|
|
with open(path, "w") as f:
|
2023-02-02 04:20:40 +00:00
|
|
|
f.write(dumps(json))
|
2023-02-05 13:53:26 +00:00
|
|
|
logger.debug("saved image params to: %s", path)
|
2023-02-02 14:31:35 +00:00
|
|
|
return path
|
2024-01-13 16:01:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
def read_metadata(
|
|
|
|
image: Image.Image,
|
|
|
|
) -> Optional[ImageMetadata]:
|
2024-01-14 02:19:37 +00:00
|
|
|
exif_data = image.getexif()
|
2024-01-13 16:01:50 +00:00
|
|
|
|
|
|
|
if ImageIFD.Make in exif_data and exif_data[ImageIFD.Make] == "onnx-web":
|
|
|
|
return ImageMetadata.from_json(exif_data[ExifIFD.MakerNote])
|
|
|
|
|
|
|
|
if ExifIFD.UserComment in exif_data:
|
|
|
|
return ImageMetadata.from_exif(exif_data[ExifIFD.UserComment])
|
|
|
|
|
|
|
|
# this could return ImageMetadata.unknown_image(), but that would not indicate whether the input
|
|
|
|
# had metadata or not, so it's easier to return None and follow the call with `or ImageMetadata.unknown_image()`
|
|
|
|
return None
|