1
0
Fork 0
onnx-web/api/onnx_web/chain/result.py

313 lines
10 KiB
Python
Raw Normal View History

from json import dumps
from logging import getLogger
from os import path
2024-01-04 04:15:50 +00:00
from typing import Any, List, Optional, Tuple
import numpy as np
2023-11-19 00:13:13 +00:00
from PIL import Image
from ..convert.utils import resolve_tensor
2024-01-03 03:24:27 +00:00
from ..params import Border, HighresParams, ImageParams, Size, UpscaleParams
2024-01-04 04:15:50 +00:00
from ..server.context import ServerContext
from ..server.load import get_extra_hashes
from ..utils import hash_file
logger = getLogger(__name__)
class NetworkMetadata:
name: str
hash: str
weight: float
def __init__(self, name: str, hash: str, weight: float) -> None:
self.name = name
self.hash = hash
self.weight = weight
2024-01-03 03:24:27 +00:00
class ImageMetadata:
border: Border
highres: HighresParams
params: ImageParams
size: Size
upscale: UpscaleParams
inversions: Optional[List[NetworkMetadata]]
loras: Optional[List[NetworkMetadata]]
models: Optional[List[NetworkMetadata]]
2024-01-03 03:24:27 +00:00
def __init__(
self,
params: ImageParams,
size: Size,
upscale: Optional[UpscaleParams] = None,
border: Optional[Border] = None,
highres: Optional[HighresParams] = None,
inversions: Optional[List[NetworkMetadata]] = None,
loras: Optional[List[NetworkMetadata]] = None,
models: Optional[List[NetworkMetadata]] = None,
2024-01-03 03:24:27 +00:00
) -> None:
self.params = params
self.size = size
self.upscale = upscale
self.border = border
self.highres = highres
2024-01-03 04:14:21 +00:00
self.inversions = inversions
self.loras = loras
self.models = models
2024-01-03 03:24:27 +00:00
2024-01-04 04:15:50 +00:00
def get_model_hash(self, model: Optional[str] = None) -> Tuple[str, str]:
model_name = path.basename(path.normpath(model or self.params.model))
logger.debug("getting model hash for %s", model_name)
model_hash = get_extra_hashes().get(model_name, None)
if model_hash is None:
model_hash_path = path.join(self.params.model, "hash.txt")
if path.exists(model_hash_path):
with open(model_hash_path, "r") as f:
model_hash = f.readline().rstrip(",. \n\t\r")
2024-01-04 04:15:50 +00:00
return (model_name, model_hash or "unknown")
2024-01-04 04:20:27 +00:00
def to_exif(self, server, output: List[str]) -> str:
2024-01-04 04:15:50 +00:00
model_name, model_hash = self.get_model_hash()
hash_map = {
model_name: model_hash,
}
inversion_hashes = ""
if self.inversions is not None:
inversion_pairs = [
(
name,
hash_file(
resolve_tensor(path.join(server.model_path, "inversion", name))
).upper(),
)
for name, _weight in self.inversions
]
inversion_hashes = ",".join(
[f"{name}: {hash}" for name, hash in inversion_pairs]
)
hash_map.update(dict(inversion_pairs))
lora_hashes = ""
if self.loras is not None:
lora_pairs = [
(
name,
hash_file(
resolve_tensor(path.join(server.model_path, "lora", name))
).upper(),
)
for name, _weight in self.loras
]
lora_hashes = ",".join([f"{name}: {hash}" for name, hash in lora_pairs])
hash_map.update(dict(lora_pairs))
return (
f"{self.params.prompt or ''}\nNegative prompt: {self.params.negative_prompt or ''}\n"
f"Steps: {self.params.steps}, Sampler: {self.params.scheduler}, CFG scale: {self.params.cfg}, "
f"Seed: {self.params.seed}, Size: {self.size.width}x{self.size.height}, "
f"Model hash: {model_hash}, Model: {model_name}, "
f"Tool: onnx-web, Version: {server.server_version}, "
f'Inversion hashes: "{inversion_hashes}", '
f'Lora hashes: "{lora_hashes}", '
f"Hashes: {dumps(hash_map)}"
2024-01-03 03:24:27 +00:00
)
2024-01-04 04:15:50 +00:00
def tojson(self, server: ServerContext, output: List[str]):
json = {
"input_size": self.size.tojson(),
2024-01-04 04:15:50 +00:00
"outputs": output,
"params": self.params.tojson(),
2024-01-04 04:15:50 +00:00
"inversions": [],
"loras": [],
"models": [],
}
2024-01-04 04:15:50 +00:00
# fix up some fields
2024-01-04 05:38:44 +00:00
model = path.basename(self.params.model)
json["params"]["model"] = model
json["models"].append(
{
"name": model,
"weight": 1.0,
"hash": self.get_model_hash(model),
}
)
# calculate final output size
output_size = self.size
if self.border is not None:
json["border"] = self.border.tojson()
output_size = output_size.add_border(self.border)
if self.highres is not None:
json["highres"] = self.highres.tojson()
output_size = self.highres.resize(output_size)
if self.upscale is not None:
json["upscale"] = self.upscale.tojson()
output_size = self.upscale.resize(output_size)
json["size"] = output_size.tojson()
if self.inversions is not None:
for name, weight in self.inversions:
hash = hash_file(
resolve_tensor(path.join(server.model_path, "inversion", name))
).upper()
2024-01-04 04:15:50 +00:00
json["inversions"].append(
{"name": name, "weight": weight, "hash": hash}
)
if self.loras is not None:
for name, weight in self.loras:
hash = hash_file(
resolve_tensor(path.join(server.model_path, "lora", name))
).upper()
2024-01-04 04:15:50 +00:00
json["loras"].append({"name": name, "weight": weight, "hash": hash})
if self.models is not None:
for name, weight in self.models:
name, hash = self.get_model_hash()
json["models"].append({"name": name, "weight": weight, "hash": hash})
return json
class StageResult:
2023-11-19 00:13:13 +00:00
"""
Chain pipeline stage result.
Can contain PIL images or numpy arrays, with helpers to convert between them.
This class intentionally does not provide `__iter__`, to ensure clients get results in the format
they are expected.
"""
arrays: Optional[List[np.ndarray]]
images: Optional[List[Image.Image]]
2024-01-03 03:24:27 +00:00
metadata: List[ImageMetadata]
2023-11-19 00:13:13 +00:00
@staticmethod
def empty():
return StageResult(images=[])
2023-11-20 05:18:57 +00:00
@staticmethod
def from_arrays(arrays: List[np.ndarray]):
return StageResult(arrays=arrays)
@staticmethod
def from_images(images: List[Image.Image]):
return StageResult(images=images)
2024-01-03 03:49:22 +00:00
def __init__(
self,
arrays: Optional[List[np.ndarray]] = None,
images: Optional[List[Image.Image]] = None,
source: Optional[Any] = None,
) -> None:
if sum([arrays is not None, images is not None, source is not None]) > 1:
2023-11-19 00:13:13 +00:00
raise ValueError("stages must only return one type of result")
2024-01-03 03:49:22 +00:00
elif arrays is None and images is None and source is None:
2023-11-19 00:13:13 +00:00
raise ValueError("stages must return results")
self.arrays = arrays
self.images = images
2024-01-03 03:49:22 +00:00
self.source = source
self.metadata = []
2023-11-19 00:13:13 +00:00
def __len__(self) -> int:
if self.arrays is not None:
return len(self.arrays)
2023-12-03 18:13:45 +00:00
elif self.images is not None:
2023-11-19 00:13:13 +00:00
return len(self.images)
2023-12-03 18:13:45 +00:00
else:
return 0
2023-11-19 00:13:13 +00:00
def as_numpy(self) -> List[np.ndarray]:
if self.arrays is not None:
return self.arrays
2023-12-03 18:13:45 +00:00
elif self.images is not None:
return [np.array(i) for i in self.images]
else:
return []
2023-11-19 00:13:13 +00:00
def as_image(self) -> List[Image.Image]:
if self.images is not None:
return self.images
2023-12-03 18:13:45 +00:00
elif self.arrays is not None:
return [Image.fromarray(np.uint8(i), shape_mode(i)) for i in self.arrays]
else:
return []
2024-01-03 03:24:27 +00:00
def push_array(self, array: np.ndarray, metadata: Optional[ImageMetadata]):
if self.arrays is not None:
self.arrays.append(array)
elif self.images is not None:
self.images.append(Image.fromarray(np.uint8(array), shape_mode(array)))
else:
self.arrays = [array]
2024-01-03 03:24:27 +00:00
if metadata is not None:
self.metadata.append(metadata)
else:
self.metadata.append(ImageMetadata())
def push_image(self, image: Image.Image, metadata: Optional[ImageMetadata]):
if self.images is not None:
self.images.append(image)
elif self.arrays is not None:
self.arrays.append(np.array(image))
else:
self.images = [image]
2024-01-03 03:24:27 +00:00
if metadata is not None:
self.metadata.append(metadata)
else:
self.metadata.append(ImageMetadata())
def insert_array(
self, index: int, array: np.ndarray, metadata: Optional[ImageMetadata]
):
if self.arrays is not None:
self.arrays.insert(index, array)
elif self.images is not None:
self.images.insert(
index, Image.fromarray(np.uint8(array), shape_mode(array))
)
else:
self.arrays = [array]
if metadata is not None:
self.metadata.insert(index, metadata)
else:
self.metadata.insert(index, ImageMetadata())
def insert_image(
self, index: int, image: Image.Image, metadata: Optional[ImageMetadata]
):
if self.images is not None:
self.images.insert(index, image)
elif self.arrays is not None:
self.arrays.insert(index, np.array(image))
else:
self.images = [image]
if metadata is not None:
self.metadata.insert(index, metadata)
else:
self.metadata.insert(index, ImageMetadata())
def shape_mode(arr: np.ndarray) -> str:
if len(arr.shape) != 3:
raise ValueError("unknown array format")
if arr.shape[-1] == 3:
return "RGB"
elif arr.shape[-1] == 4:
return "RGBA"
raise ValueError("unknown image format")