commit 42740fc73b0bf05d73e8305d6b2655f8752353b3 Author: Sean Sube Date: Thu May 2 06:25:35 2024 -0500 first version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b36d05 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +worlds/ +__pycache__/ diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..db9d4d8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,37 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "args": [ + "--world", + "worlds/cyber.json", + "--world-state", + "worlds/cyber-state.json" + ], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}", + "env": { + "PYTHONPATH": "${workspaceFolder}", + "OLLAMA_API": "http://10.2.2.81:11434", + "PACKIT_DRIVER": "ollama", + "PACKIT_MODEL": "dolphin-llama3" + } + }, + { + "name": "Python Debugger: Local Attach", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "127.0.0.1", + "port": 5679 + }, + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..e0009a2 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.typeCheckingMode": "basic" +} \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f43e915 --- /dev/null +++ b/Makefile @@ -0,0 +1,59 @@ +.PHONY: ci check-venv pip pip-dev lint-check lint-fix test typecheck package package-dist package-upload style + +venv: ## create virtual env + python3 -v venv venv + +ci: pip pip-dev lint-check + $(MAKE) test + +check-venv: + if [ -z $${VIRTUAL_ENV+x} ]; then echo "Are you sure you want to install dependencies outside of a virtual environment?"; sleep 30; fi + +pip: check-venv + pip install -r requirements/cpu.txt + pip install -r requirements/base.txt + +pip-dev: check-venv + pip install -r requirements/dev.txt + +test: + python -m coverage erase + python -m coverage run -m unittest discover -v -s tests/ + python -m coverage html -i + python -m coverage xml -i + python -m coverage report -i + +package: package-dist package-upload + +package-dist: + python3 ./setup.py sdist + +package-upload: + twine upload dist/* + +lint-check: + black --check adventure/ + black --check scripts/ + black --check tests/ + flake8 adventure + flake8 scripts + flake8 tests + isort --check-only --skip __init__.py --filter-files adventure + isort --check-only --skip __init__.py --filter-files scripts + isort --check-only --skip __init__.py --filter-files tests + +lint-fix: + black adventure/ + black scripts/ + black tests/ + flake8 adventure + flake8 scripts + flake8 tests + isort --skip __init__.py --filter-files adventure + isort --skip __init__.py --filter-files scripts + isort --skip __init__.py --filter-files tests + +style: lint-fix + +typecheck: + mypy feedme diff --git a/adventure/main.py b/adventure/main.py new file mode 100644 index 0000000..4ce2832 --- /dev/null +++ b/adventure/main.py @@ -0,0 +1,695 @@ +from typing import TYPE_CHECKING, Dict, List, Callable, Sequence +from random import choice, randint +from os import path +from json import dump, load, loads +from collections import deque + +from pydantic import Field +from pydantic import RootModel +from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, SystemMessage + + +from packit.agent import Agent, agent_easy_connect +from packit.results import multi_function_or_str_result +from packit.toolbox import Toolbox +from packit.utils import logger_with_colors +from packit.loops import loop_tool +from packit.utils import could_be_json + +if TYPE_CHECKING: + from dataclasses import dataclass +else: + from pydantic.dataclasses import dataclass as dataclass # noqa + + +logger = logger_with_colors(__name__) + + +# world building +Actions = Dict[str, Callable] + + +@dataclass +class Item: + name: str + description: str + actions: Actions = Field(default_factory=dict) + + +@dataclass +class Actor: + name: str + backstory: str + description: str + health: int + actions: Actions = Field(default_factory=dict) + items: List[Item] = Field(default_factory=list) + + +@dataclass +class Room: + name: str + description: str + portals: Dict[str, str] = Field(default_factory=dict) + items: List[Item] = Field(default_factory=list) + actors: List[Actor] = Field(default_factory=list) + actions: Actions = Field(default_factory=dict) + + +@dataclass +class World: + rooms: List[Room] + theme: str + + +@dataclass +class WorldState: + world: World + memory: Dict[str, List[str | Dict[str, str]]] + step: int + + +# world generators +def generate_room(agent: Agent, world_theme: str, existing_rooms: List[str]) -> Room: + name = agent( + 'Generate one room, area, or location that would make sense in the world of {world_theme}. Only respond with the room name, do not include the description or any other text. Do not prefix the name with "the", do not wrap it in quotes. The existing rooms are: {existing_rooms}', + world_theme=world_theme, + existing_rooms=existing_rooms, + ) + logger.info(f"Generating room: {name}") + desc = agent( + "Generate a detailed description of the {name} area. What does it look like? What does it smell like? What can be seen or heard?", + name=name, + ) + + items = [] + actors = [] + actions = {} + + return Room( + name=name, description=desc, items=items, actors=actors, actions=actions + ) + + +def generate_item( + agent: Agent, + world_theme: str, + dest_room: str | None = None, + dest_actor: str | None = None, + existing_items: List[str] = [], +) -> Item: + if dest_actor: + dest_note = "The item will be held by the {dest_actor} character" + elif dest_room: + dest_note = "The item will be placed in the {dest_room} room" + else: + dest_note = "The item will be placed in the world" + + name = agent( + "Generate one item or object that would make sense in the world of {world_theme}. {dest_note}. " + 'Only respond with the item name, do not include a description or any other text. Do not prefix the name with "the", do not wrap it in quotes. ' + "Do not create any duplicate items in the same room. Do not give characters any duplicate items. The existing items are: {existing_items}", + dest_note=dest_note, + existing_items=existing_items, + world_theme=world_theme, + ) + logger.info(f"Generating item: {name}") + desc = agent( + "Generate a detailed description of the {name} item. What does it look like? What is it made of? What does it do?", + name=name, + ) + + actions = {} + + return Item(name=name, description=desc, actions=actions) + + +def generate_actor( + agent: Agent, world_theme: str, dest_room: str, existing_actors: List[str] = [] +) -> Actor: + name = agent( + "Generate one person or creature that would make sense in the world of {world_theme}. The character will be placed in the {dest_room} room. " + 'Only respond with the character name, do not include a description or any other text. Do not prefix the name with "the", do not wrap it in quotes. ' + "Do not create any duplicate characters in the same room. The existing characters are: {existing_actors}", + dest_room=dest_room, + existing_actors=existing_actors, + world_theme=world_theme, + ) + logger.info(f"Generating actor: {name}") + description = agent( + "Generate a detailed description of the {name} character. What do they look like? What are they wearing? What are they doing? Describe their appearance from the perspective of an outside observer.", + name=name, + ) + backstory = agent( + 'Generate a backstory for the {name} actor. Where are they from? What are they doing here? What are their goals? Make sure to phrase the backstory in the second person, starting with "you are" and speaking directly to {name}.', + name=name, + ) + + health = 100 + actions = {} + + return Actor( + name=name, + backstory=backstory, + description=description, + health=health, + actions=actions, + ) + + +def generate_world(agent: Agent, theme: str) -> World: + room_count = randint(3, 5) + logger.info(f"Generating a {theme} with {room_count} rooms") + + existing_actors: List[str] = [] + existing_items: List[str] = [] + + # generate the rooms + rooms = [] + for i in range(room_count): + existing_rooms = [room.name for room in rooms] + room = generate_room(agent, theme, existing_rooms) + rooms.append(room) + + item_count = randint(0, 3) + for j in range(item_count): + item = generate_item( + agent, theme, dest_room=room.name, existing_items=existing_items + ) + room.items.append(item) + existing_items.append(item.name) + + actor_count = randint(0, 3) + for j in range(actor_count): + actor = generate_actor( + agent, theme, dest_room=room.name, existing_actors=existing_actors + ) + room.actors.append(actor) + existing_actors.append(actor.name) + + # generate the actor's inventory + item_count = randint(0, 3) + for k in range(item_count): + item = generate_item( + agent, theme, dest_room=room.name, existing_items=existing_items + ) + actor.items.append(item) + existing_items.append(item.name) + + opposite_directions = { + "north": "south", + "south": "north", + "east": "west", + "west": "east", + } + + # TODO: generate portals to link the rooms together + for room in rooms: + directions = ["north", "south", "east", "west"] + for direction in directions: + if direction in room.portals: + logger.debug(f"Room {room.name} already has a {direction} portal") + continue + + opposite_direction = opposite_directions[direction] + + if randint(0, 1): + dest_room = choice([r for r in rooms if r.name != room.name]) + + # make sure not to create duplicate links + if room.name in dest_room.portals.values(): + logger.debug( + f"Room {dest_room.name} already has a portal to {room.name}" + ) + continue + + if opposite_direction in dest_room.portals: + logger.debug( + f"Room {dest_room.name} already has a {opposite_direction} portal" + ) + continue + + # create bidirectional links + room.portals[direction] = dest_room.name + dest_room.portals[opposite_directions[direction]] = room.name + + return World(rooms=rooms, theme=theme) + + +# memory space +current_world = None +current_room = None +current_actor = None +current_step = 0 +actor_agents = {} +action_tools = None + + +# simulation +def check_current(): + if not current_world: + raise ValueError( + "The current world must be set before calling action functions" + ) + if not current_room: + raise ValueError("The current room must be set before calling action functions") + if not current_actor: + raise ValueError( + "The current actor must be set before calling action functions" + ) + + return (current_world, current_room, current_actor) + + +def action_look(target: str) -> str: + _, action_room, action_actor = check_current() + logger.info(f"{action_actor.name} looks at {target}") + + if target == action_room.name: + logger.info(f"{action_actor.name} saw the {action_room.name} room") + return action_room.description + + for actor in action_room.actors: + if actor.name == target: + logger.info( + f"{action_actor.name} saw the {actor.name} actor in the {action_room.name} room" + ) + return actor.description + + for item in action_room.items: + if item.name == target: + logger.info( + f"{action_actor.name} saw the {item.name} item in the {action_room.name} room" + ) + return item.description + + for item in action_actor.items: + if item.name == target: + logger.info( + f"{action_actor.name} saw the {item.name} item in their inventory" + ) + return item.description + + return "You do not see that item or character in the room." + + +def action_move(direction: str) -> str: + action_world, action_room, action_actor = check_current() + + destination_name = action_room.portals.get(direction) + if not destination_name: + return f"You cannot move {direction} from here." + + destination_room = next( + (room for room in action_world.rooms if room.name == destination_name), None + ) + if not destination_room: + return f"The {destination_name} room does not exist." + + logger.info(f"{action_actor.name} moves {direction} to {destination_name}") + action_room.actors.remove(action_actor) + destination_room.actors.append(action_actor) + + return f"You move {direction} and arrive at {destination_name}." + + +def action_take(item_name: str) -> str: + _, action_room, action_actor = check_current() + + item = next((item for item in action_room.items if item.name == item_name), None) + if item: + logger.info(f"{action_actor.name} takes the {item_name} item") + action_room.items.remove(item) + action_actor.items.append(item) + return "You take the {item_name} item and put it in your inventory." + else: + return "The {item_name} item is not in the room." + + +def action_ask(character: str, question: str) -> str: + # capture references to the current actor and room, because they will be overwritten + action_actor = current_actor + action_room = current_room + + if not action_actor or not action_room: + raise ValueError( + "The current actor and room must be set before calling action_ask" + ) + + # sanity checks + if character == action_actor.name: + return "You cannot ask yourself a question. Stop talking to yourself." + + question_actor, question_agent = actor_agents.get(character, (None, None)) + if not question_actor: + return f"The {character} character is not in the room." + + if not question_agent: + return f"The {character} character does not exist." + + logger.info(f"{action_actor.name} asks {character}: {question}") + answer = question_agent(f"{action_actor.name} asks you: {question}. Reply with your response. Do not include the question or any other text, only your reply to {action_actor.name}.") + + if could_be_json(answer) and action_tell.__name__ in answer: + answer = loads(answer).get("parameters", {}).get("message", "") + + if len(answer.strip()) > 0: + logger.info(f"{character} responds to {action_actor.name}: {answer}") + return f"{character} responds: {answer}" + + return f"{character} does not respond." + + +def action_tell(character: str, message: str) -> str: + # capture references to the current actor and room, because they will be overwritten + action_actor = current_actor + action_room = current_room + + if not action_actor or not action_room: + raise ValueError( + "The current actor and room must be set before calling action_tell" + ) + + # sanity checks + if character == action_actor.name: + return "You cannot tell yourself a message. Stop talking to yourself." + + question_actor, question_agent = actor_agents.get(character, (None, None)) + if not question_actor: + return f"The {character} character is not in the room." + + if not question_agent: + return f"The {character} character does not exist." + + logger.info(f"{action_actor.name} tells {character}: {message}") + answer = question_agent(f"{action_actor.name} tells you: {message}. Reply with your response. Do not include the message or any other text, only your reply to {action_actor.name}.") + + if could_be_json(answer) and action_tell.__name__ in answer: + answer = loads(answer).get("parameters", {}).get("message", "") + + if len(answer.strip()) > 0: + logger.info(f"{character} responds to {action_actor.name}: {answer}") + return f"{character} responds: {answer}" + + return f"{character} does not respond." + + +def action_give(character: str, item_name: str) -> str: + _, action_room, action_actor = check_current() + + destination_actor = next( + (actor for actor in action_room.actors if actor.name == character), None + ) + if not destination_actor: + return f"The {character} character is not in the room." + + item = next((item for item in action_actor.items if item.name == item_name), None) + if not item: + return f"You do not have the {item_name} item in your inventory." + + logger.info(f"{action_actor.name} gives {character} the {item_name} item") + action_actor.items.remove(item) + destination_actor.items.append(item) + + return f"You give the {item_name} item to {character}." + + +def action_stop() -> str: + _, _, action_actor = check_current() + logger.info(f"{action_actor.name} end their turn") + return "You stop your actions and end your turn." + + +def world_result_parser(value, agent, **kwargs): + global current_world + global current_room + global current_actor + + if not current_world: + raise ValueError( + "The current world must be set before calling world_result_parser" + ) + + logger.debug(f"parsing action for {agent.name}: {value}") + + current_actor = next( + ( + inner_actor + for inner_actor, inner_agent in actor_agents.values() + if inner_agent == agent + ), + None, + ) + current_room = next( + (room for room in current_world.rooms if current_actor in room.actors), None + ) + + return multi_function_or_str_result(value, agent=agent, **kwargs) + + +def create_agents(world: World, memory: Dict[str, List[str | Dict[str, str]]] = {}): + # set up agents for each actor + global actor_agents + + llm = agent_easy_connect() + # for each actor in each room in the world + for room in world.rooms: + for actor in room.actors: + agent = Agent(actor.name, actor.backstory, {}, llm) + agent.memory = restore_memory(memory.get(actor.name, [])) + actor_agents[actor.name] = (actor, agent) + + +def simulate_world(world: World, steps: int = 10, callback=None): + logger.info("Simulating the world") + + # collect actors, so they are only processed once + all_actors = [actor for room in world.rooms for actor in room.actors] + + # prep the actions + global action_tools + + # TODO: add actions for: drop, use, attack, cast, jump, climb, swim, fly, etc. + action_tools = Toolbox( + [ + action_ask, + action_give, + action_look, + action_move, + action_take, + action_tell, + ] + ) + + # create a result parser that will memorize the actor and room + global current_step + global current_world + current_world = world + + # simulate each actor + for i in range(steps): + logger.info(f"Simulating step {current_step}") + for actor in all_actors: + _, agent = actor_agents[actor.name] + room = next((room for room in world.rooms if actor in room.actors), None) + if not room: + logger.error(f"Actor {actor.name} is not in a room") + continue + + room_actors = [actor.name for actor in room.actors] + room_items = [item.name for item in room.items] + room_directions = list(room.portals.keys()) + + logger.info("starting actor %s turn", actor.name) + result = loop_tool( + agent, + ( + "You are currently in {room_name}. {room_description}. " + "The room contains the following characters: {actors}. " + "The room contains the following items: {items}. " + "You can take the following actions: {actions}. " + "You can move in the following directions: {directions}. " + "What will you do next? Reply with a JSON function call, calling one of the actions." + ), + context={ + "actions": [ + "ask", + "give", + "look", + "move", + "take", + "tell", + ], # , "use"], + "actors": room_actors, + "directions": room_directions, + "items": room_items, + "room_name": room.name, + "room_description": room.description, + }, + result_parser=world_result_parser, + toolbox=action_tools, + ) + + logger.info(f"{actor.name} step result: {result}") + + # if result was JSON, it has already been parsed and executed. anything remaining is flavor text + # that should be presented back to the actor + # TODO: inject this directly in the agent's memory rather than reprompting them + response = agent( + "The result of your last action was: {result}. Your turn is over, no further actions will be accepted. If you understand, reply with the word \"end\".", + result=result, + ) + + logger.info(f"{actor.name} step response: '{response}'") + if response.strip().lower() not in ["end", ""]: + logger.warning(f"{actor.name} responded after the end of their turn: %s", response) + response = agent( + "Your turn is over, no further actions will be accepted. Do not reply." + ) + logger.info(f"{actor.name} warning response: {response}") + + if callback: + callback(world, current_step) + + current_step += 1 + + +def graph_world(world: World, step: int): + import graphviz + + graph = graphviz.Digraph(f"{world.theme}-{step}", format="png") + for room in world.rooms: + room_label = "\n".join([room.name, *[actor.name for actor in room.actors]]) + graph.node(room.name, room_label) # , room.description) + for direction, destination in room.portals.items(): + graph.edge(room.name, destination, label=direction) + + graph.render(directory="worlds", view=True) + + +def snapshot_world(world: World, step: int): + # save the world itself, along with the step number of the memory of each agent + json_world = RootModel[World](world).model_dump() + + json_memory = {} + + for actor, agent in actor_agents.values(): + json_memory[actor.name] = list(agent.memory) + + return { + "world": json_world, + "memory": json_memory, + "step": step, + } + + +def restore_memory( + data: Sequence[str | Dict[str, str]] +) -> deque[str | AIMessage | HumanMessage | SystemMessage]: + memories = [] + + for memory in data: + if isinstance(memory, str): + memories.append(memory) + elif isinstance(memory, dict): + memory_content = memory["content"] + memory_type = memory["type"] + + if memory_type == "human": + memories.append(HumanMessage(content=memory_content)) + elif memory_type == "system": + memories.append(SystemMessage(content=memory_content)) + elif memory_type == "ai": + memories.append(AIMessage(content=memory_content)) + + return deque(memories, maxlen=10) + + +# main +def parse_args(): + import argparse + + parser = argparse.ArgumentParser( + description="Generate and simulate a fantasy world" + ) + parser.add_argument( + "--steps", type=int, default=10, help="The number of simulation steps to run" + ) + parser.add_argument( + "--theme", type=str, default="fantasy", help="The theme of the generated world" + ) + parser.add_argument( + "--world", + type=str, + default="world.json", + help="The file to save the generated world to", + ) + parser.add_argument( + "--world-state", + type=str, + default="world-state.json", + help="The file to save the world state to", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + if args.world_state and path.exists(args.world_state): + logger.info(f"Loading world state from {args.world_state}") + with open(args.world_state, "r") as f: + state = WorldState(**load(f)) + + # TODO: reload agent memory + global current_step + + current_step = state.step + world = state.world + create_agents(world, state.memory) + elif args.world and path.exists(args.world): + logger.info(f"Loading world from {args.world}") + with open(args.world, "r") as f: + world = World(**load(f)) + create_agents(world) + else: + logger.info(f"Generating a new {args.theme} world") + llm = agent_easy_connect() + agent = Agent( + "world builder", + f"You are an experienced game master creating a visually detailed {args.theme} world for a new adventure.", + {}, + llm, + ) + world = generate_world(agent, args.theme) + create_agents(world) + + logger.debug("Loaded world: %s", world) + + if args.world: + with open(args.world, "w") as f: + json_world = RootModel[World](world).model_dump_json(indent=2) + f.write(json_world) + + def save_world_state(world, step): + graph_world(world, step) + if args.world_state: + json_state = snapshot_world(world, step) + with open(args.world_state, "w") as f: + + def dumper(obj): + if isinstance(obj, BaseMessage): + return { + "content": obj.content, + "type": obj.type, + } + + raise ValueError(f"Cannot serialize {obj}") + + dump(json_state, f, default=dumper, indent=2) + + simulate_world(world, steps=args.steps, callback=save_world_state) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e09ea4c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[tool.black] +force-exclude = '''/(diffusion_original|lpw).py''' + +[tool.isort] +profile = "black" +force_to_top = ".logging" +skip_glob = ["*/lpw.py"] + +[tool.mypy] +# ignore_missing_imports = true +exclude = [] + +[[tool.mypy.overrides]] +module = [] +ignore_missing_imports = true