import argparse import atexit from functools import partial from glob import glob from itertools import count from logging.config import dictConfig from os import environ, path from typing import List from dotenv import load_dotenv from packit.agent import Agent, agent_easy_connect from packit.memory import make_limited_memory from packit.utils import logger_with_colors # configure logging # this is the only taleweave import allowed before the logger has been created from taleweave.utils.file import load_yaml LOG_PATH = "logging.json" try: if path.exists(LOG_PATH): with open(LOG_PATH, "r") as f: config_logging = load_yaml(f) dictConfig(config_logging) else: print("logging config not found") except Exception as err: print("error loading logging config: %s" % (err)) logger = logger_with_colors(__name__) # , level="DEBUG") load_dotenv(environ.get("TALEWEAVE_ENV", ".env"), override=True) # start the debugger, if needed if environ.get("DEBUG", "false").lower() == "true": import debugpy debugpy.listen(5679)"waiting for debugger to attach...") debugpy.wait_for_client() if True: from taleweave.context import ( get_current_turn, get_prompt_library, get_system_data, set_current_turn, set_current_world, set_dungeon_master, set_extra_actions, set_game_config, set_game_systems, set_system_data, subscribe, ) from taleweave.game_system import GameSystem from taleweave.generate import generate_room, generate_world, link_rooms from taleweave.models.config import DEFAULT_CONFIG, Config from taleweave.models.entity import World, WorldState from taleweave.models.event import GenerateEvent from taleweave.models.files import TemplateFile, WorldPrompt from taleweave.models.prompt import PromptLibrary from taleweave.plugins import load_plugin from taleweave.state import create_agents, save_world, save_world_state from import init_action from import init_planning from taleweave.utils.template import format_prompt def int_or_inf(value: str) -> float | int: if value == "inf": return float("inf") return int(value) # main def parse_args(): parser = argparse.ArgumentParser( description="Generate and simulate a text adventure world" ) parser.add_argument( "--actions", type=str, nargs="*", help="Extra actions to include in the simulation", ) parser.add_argument( "--add-rooms", default=0, type=int, help="The number of new rooms to generate before starting the simulation", ) parser.add_argument( "--config", type=str, help="The file to load additional configuration from", ) parser.add_argument( "--discord", action="store_true", help="Whether to run the simulation in a Discord bot", ) parser.add_argument( "--flavor", type=str, default="", help="Some additional flavor text for the generated world", ) parser.add_argument( "--optional-actions", action="store_true", help="Whether to include optional actions in the simulation", ) parser.add_argument( "--player", type=str, help="The name of the character to play as", ) parser.add_argument( "--prompts", type=str, nargs="*", help="The file to load game prompts from", ) parser.add_argument( "--render", action="store_true", help="Whether to run the render thread", ) parser.add_argument( "--render-generated", action="store_true", help="Whether to render entities as they are generated", ) parser.add_argument( "--rooms", type=int, help="The number of rooms to generate", ) parser.add_argument( "--server", action="store_true", help="Whether to run the websocket server", ) parser.add_argument( "--state", type=str, help="The file to save the world state to. Defaults to $world.state.json, if not set", ) parser.add_argument( "--turns", type=int_or_inf, default=10, help="The number of simulation turns to run", ) parser.add_argument( "--systems", type=str, nargs="*", help="Extra systems to run in the simulation", ) parser.add_argument( "--theme", type=str, default="fantasy", help="The theme of the generated world", ) parser.add_argument( "--world", type=str, default="world", help="The file to save the generated world to", ) parser.add_argument( "--world-template", type=str, help="The template file to load the world prompt from", ) return parser.parse_args() def get_world_prompt(args) -> WorldPrompt: if args.world_template: prompt_file, prompt_name = args.world_template.split(":") with open(prompt_file, "r") as f: prompts = TemplateFile(**load_yaml(f)) for prompt in prompts.templates: if == prompt_name: return prompt logger.warning(f"prompt {prompt_name} not found in {prompt_file}") return WorldPrompt(, theme=args.theme, flavor=args.flavor, ) def load_prompt_library(args) -> None: if args.prompts: prompt_files = [] for prompt_file in args.prompts: prompt_files.extend(glob(prompt_file, recursive=True)) for prompt_file in prompt_files: with open(prompt_file, "r") as f: new_library = PromptLibrary(**load_yaml(f))"loaded prompt library from {prompt_file}") library = get_prompt_library() library.prompts.update(new_library.prompts) return None def load_or_initialize_system_data( world_path: str, systems: List[GameSystem], world: World ): for system in systems: if system_data_file = f"{world_path}.{}.json" if path.exists(system_data_file):"loading system data from {system_data_file}") data = set_system_data(, data) continue else:"no system data found at {system_data_file}") if system.initialize:"initializing system data for {}") data = system.initialize(world) set_system_data(, data) def save_system_data(args, systems: List[GameSystem]): for system in systems: if system_data_file = f"{}.{}.json""saving system data to {system_data_file}"), get_system_data( def load_or_generate_world( args, config: Config, players, systems: List[GameSystem], world_prompt: WorldPrompt ): world_file = + ".json" world_state_file = args.state or ( + ".state.json") add_rooms = args.add_rooms memory = {} turn = 0 # prepare an agent for the world builder llm = agent_easy_connect() memory_factory = partial( make_limited_memory, ) world_builder = Agent( "World Builder", format_prompt( "world_generate_dungeon_master", flavor=world_prompt.flavor, theme=world_prompt.theme, ), {}, llm, memory_factory=memory_factory, ) set_dungeon_master(world_builder) if path.exists(world_state_file):"loading world state from {world_state_file}") with open(world_state_file, "r") as f: state = WorldState(**load_yaml(f)) set_current_turn(state.turn) load_or_initialize_system_data(, systems, memory = state.memory turn = state.turn world = elif path.exists(world_file):"loading world from {world_file}") with open(world_file, "r") as f: world = World(**load_yaml(f)) load_or_initialize_system_data(, systems, world) else:"generating a new world using theme: {world_prompt.theme}") world = generate_world( world_builder,, world_prompt.theme, systems, room_count=args.rooms, ) load_or_initialize_system_data(, systems, world) # TODO: check if there have been any changes before saving save_world(world, world_file) save_system_data(args, systems) new_rooms = [] for i in range(add_rooms):"generating room {i + 1} of {add_rooms}") room = generate_room( world_builder, world, systems, current_room=i, total_rooms=add_rooms ) new_rooms.append(room) world.rooms.append(room) if new_rooms: link_rooms(world_builder, world, systems, new_rooms) create_agents(world, memory=memory, players=players) return (world, world_state_file, turn) def main(): args = parse_args() if args.config: with open(args.config, "r") as f: config = Config(**load_yaml(f)) set_game_config(config) else: config = DEFAULT_CONFIG load_prompt_library(args) players = [] if args.player: players.append(args.player) # launch other threads threads = [] if args.render: from taleweave.render.comfy import launch_render, render_generated threads.extend(launch_render(config.render)) if args.render_generated: subscribe(GenerateEvent, render_generated) if args.discord: from import launch_bot threads.extend(launch_bot( if args.server: from taleweave.server.websocket import launch_server threads.extend(launch_server(config.server.websocket)) # register the thread shutdown handler def shutdown_threads(): for thread in threads: thread.join(1.0) atexit.register(shutdown_threads) # load built-in but optional actions extra_actions = [] if args.optional_actions:"loading optional actions") from taleweave.actions.optional import init_optional optional_actions = init_optional() f"loaded optional actions: {[action.__name__ for action in optional_actions]}" ) extra_actions.extend(optional_actions) # load extra actions from plugins for action_name in args.actions or []:"loading extra actions from {action_name}") module_actions = load_plugin(action_name) f"loaded extra actions: {[action.__name__ for action in module_actions]}" ) extra_actions.extend(module_actions) set_extra_actions(extra_actions) # set up the game systems systems: List[GameSystem] = [] systems.extend(init_planning()) systems.extend(init_action()) # load extra systems from plugins for system_name in or []:"loading extra systems from {system_name}") module_systems = load_plugin(system_name)"loaded extra systems: {module_systems}") systems.extend(module_systems) # make sure the server system runs after any updates if args.server: from taleweave.server.websocket import server_system systems.append(GameSystem(name="server", simulate=server_system)) set_game_systems(systems) # load or generate the world world_prompt = get_world_prompt(args) world, world_state_file, world_turn = load_or_generate_world( args, config, players, systems, world_prompt=world_prompt ) set_current_world(world) # make sure the snapshot system runs last def snapshot_system(world: World, turn: int, data: None = None) -> None:"taking snapshot of world state") save_world_state(world, turn, world_state_file) systems.append(GameSystem(name="snapshot", simulate=snapshot_system)) # hack: send a snapshot to the websocket server if args.server: server_system(world, world_turn) # run game systems for each turn"simulating the world for {args.turns} turns using systems: {systems}") for i in count(): current_turn = get_current_turn()"simulating turn {i} of {args.turns} (world turn {current_turn})") for system in systems: if system.simulate:"running system {}") system.simulate(world, current_turn) set_current_turn(current_turn + 1) if i >= args.turns:"reached turn limit at world turn %s", current_turn + 1) break if __name__ == "__main__": main()