1
0
Fork 0

use action context managers, sort modules

This commit is contained in:
Sean Sube 2024-05-18 17:29:40 -05:00
parent 03c324ef60
commit 36f29dcffa
Signed by: ssube
GPG Key ID: 3EED7B957D362AF1
21 changed files with 478 additions and 452 deletions

View File

@ -56,4 +56,4 @@ lint-fix:
style: lint-fix
typecheck:
mypy feedme
mypy adventure

0
adventure/__init__.py Normal file
View File

View File

@ -3,8 +3,13 @@ from logging import getLogger
from packit.utils import could_be_json
from adventure.context import broadcast, get_actor_agent_for_name, get_current_context
from adventure.search import (
from adventure.context import (
action_context,
broadcast,
get_actor_agent_for_name,
world_context,
)
from adventure.utils.search import (
find_actor_in_room,
find_item_in_actor,
find_item_in_room,
@ -22,35 +27,36 @@ def action_look(target: str) -> str:
Args:
target: The name of the target to look at.
"""
_, action_room, action_actor = get_current_context()
broadcast(f"{action_actor.name} looks at {target}")
if target.lower() == action_room.name.lower():
broadcast(f"{action_actor.name} saw the {action_room.name} room")
return describe_entity(action_room)
with action_context() as (action_room, action_actor):
broadcast(f"{action_actor.name} looks at {target}")
target_actor = find_actor_in_room(action_room, target)
if target_actor:
broadcast(
f"{action_actor.name} saw the {target_actor.name} actor in the {action_room.name} room"
)
return describe_entity(target_actor)
if target.lower() == action_room.name.lower():
broadcast(f"{action_actor.name} saw the {action_room.name} room")
return describe_entity(action_room)
target_item = find_item_in_room(action_room, target)
if target_item:
broadcast(
f"{action_actor.name} saw the {target_item.name} item in the {action_room.name} room"
)
return describe_entity(target_item)
target_actor = find_actor_in_room(action_room, target)
if target_actor:
broadcast(
f"{action_actor.name} saw the {target_actor.name} actor in the {action_room.name} room"
)
return describe_entity(target_actor)
target_item = find_item_in_actor(action_actor, target)
if target_item:
broadcast(
f"{action_actor.name} saw the {target_item.name} item in their inventory"
)
return describe_entity(target_item)
target_item = find_item_in_room(action_room, target)
if target_item:
broadcast(
f"{action_actor.name} saw the {target_item.name} item in the {action_room.name} room"
)
return describe_entity(target_item)
return "You do not see that item or character in the room."
target_item = find_item_in_actor(action_actor, target)
if target_item:
broadcast(
f"{action_actor.name} saw the {target_item.name} item in their inventory"
)
return describe_entity(target_item)
return "You do not see that item or character in the room."
def action_move(direction: str) -> str:
@ -60,21 +66,21 @@ def action_move(direction: str) -> str:
Args:
direction: The direction to move in.
"""
action_world, action_room, action_actor = get_current_context()
destination_name = action_room.portals.get(direction.lower())
if not destination_name:
return f"You cannot move {direction} from here."
with world_context() as (action_world, action_room, action_actor):
destination_name = action_room.portals.get(direction.lower())
if not destination_name:
return f"You cannot move {direction} from here."
destination_room = find_room(action_world, destination_name)
if not destination_room:
return f"The {destination_name} room does not exist."
destination_room = find_room(action_world, destination_name)
if not destination_room:
return f"The {destination_name} room does not exist."
broadcast(f"{action_actor.name} moves {direction} to {destination_name}")
action_room.actors.remove(action_actor)
destination_room.actors.append(action_actor)
broadcast(f"{action_actor.name} moves {direction} to {destination_name}")
action_room.actors.remove(action_actor)
destination_room.actors.append(action_actor)
return f"You move {direction} and arrive at {destination_name}."
return f"You move {direction} and arrive at {destination_name}."
def action_take(item_name: str) -> str:
@ -84,16 +90,15 @@ def action_take(item_name: str) -> str:
Args:
item_name: The name of the item to take.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
item = find_item_in_room(action_room, item_name)
if not item:
return "The {item_name} item is not in the room."
item = find_item_in_room(action_room, item_name)
if item:
broadcast(f"{action_actor.name} takes the {item_name} item")
action_room.items.remove(item)
action_actor.items.append(item)
return "You take the {item_name} item and put it in your inventory."
else:
return "The {item_name} item is not in the room."
def action_ask(character: str, question: str) -> str:
@ -105,33 +110,32 @@ def action_ask(character: str, question: str) -> str:
question: The question to ask them.
"""
# capture references to the current actor and room, because they will be overwritten
_world, _room, action_actor = get_current_context()
with action_context() as (_, action_actor):
# sanity checks
question_actor, question_agent = get_actor_agent_for_name(character)
if question_actor == action_actor:
return "You cannot ask yourself a question. Stop talking to yourself. Try another action."
# sanity checks
question_actor, question_agent = get_actor_agent_for_name(character)
if question_actor == action_actor:
return "You cannot ask yourself a question. Stop talking to yourself. Try another action."
if not question_actor:
return f"The {character} character is not in the room."
if not question_actor:
return f"The {character} character is not in the room."
if not question_agent:
return f"The {character} character does not exist."
if not question_agent:
return f"The {character} character does not exist."
broadcast(f"{action_actor.name} asks {character}: {question}")
answer = question_agent(
f"{action_actor.name} asks you: {question}. Reply with your response to them. "
f"Do not include the question or any JSON. Only include your answer for {action_actor.name}."
)
broadcast(f"{action_actor.name} asks {character}: {question}")
answer = question_agent(
f"{action_actor.name} asks you: {question}. Reply with your response to them. "
f"Do not include the question or any JSON. Only include your answer for {action_actor.name}."
)
if could_be_json(answer) and action_tell.__name__ in answer:
answer = loads(answer).get("parameters", {}).get("message", "")
if could_be_json(answer) and action_tell.__name__ in answer:
answer = loads(answer).get("parameters", {}).get("message", "")
if len(answer.strip()) > 0:
broadcast(f"{character} responds to {action_actor.name}: {answer}")
return f"{character} responds: {answer}"
if len(answer.strip()) > 0:
broadcast(f"{character} responds to {action_actor.name}: {answer}")
return f"{character} responds: {answer}"
return f"{character} does not respond."
return f"{character} does not respond."
def action_tell(character: str, message: str) -> str:
@ -143,33 +147,33 @@ def action_tell(character: str, message: str) -> str:
message: The message to tell them.
"""
# capture references to the current actor and room, because they will be overwritten
_world, _room, action_actor = get_current_context()
# sanity checks
question_actor, question_agent = get_actor_agent_for_name(character)
if question_actor == action_actor:
return "You cannot tell yourself a message. Stop talking to yourself. Try another action."
with action_context() as (_, action_actor):
# sanity checks
question_actor, question_agent = get_actor_agent_for_name(character)
if question_actor == action_actor:
return "You cannot tell yourself a message. Stop talking to yourself. Try another action."
if not question_actor:
return f"The {character} character is not in the room."
if not question_actor:
return f"The {character} character is not in the room."
if not question_agent:
return f"The {character} character does not exist."
if not question_agent:
return f"The {character} character does not exist."
broadcast(f"{action_actor.name} tells {character}: {message}")
answer = question_agent(
f"{action_actor.name} tells you: {message}. Reply with your response to them. "
f"Do not include the message or any JSON. Only include your reply to {action_actor.name}."
)
broadcast(f"{action_actor.name} tells {character}: {message}")
answer = question_agent(
f"{action_actor.name} tells you: {message}. Reply with your response to them. "
f"Do not include the message or any JSON. Only include your reply to {action_actor.name}."
)
if could_be_json(answer) and action_tell.__name__ in answer:
answer = loads(answer).get("parameters", {}).get("message", "")
if could_be_json(answer) and action_tell.__name__ in answer:
answer = loads(answer).get("parameters", {}).get("message", "")
if len(answer.strip()) > 0:
broadcast(f"{character} responds to {action_actor.name}: {answer}")
return f"{character} responds: {answer}"
if len(answer.strip()) > 0:
broadcast(f"{character} responds to {action_actor.name}: {answer}")
return f"{character} responds: {answer}"
return f"{character} does not respond."
return f"{character} does not respond."
def action_give(character: str, item_name: str) -> str:
@ -180,21 +184,20 @@ def action_give(character: str, item_name: str) -> str:
character: The name of the character to give the item to.
item_name: The name of the item to give.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
destination_actor = find_actor_in_room(action_room, character)
if not destination_actor:
return f"The {character} character is not in the room."
destination_actor = find_actor_in_room(action_room, character)
if not destination_actor:
return f"The {character} character is not in the room."
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have the {item_name} item in your inventory."
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have the {item_name} item in your inventory."
broadcast(f"{action_actor.name} gives {character} the {item_name} item.")
action_actor.items.remove(item)
destination_actor.items.append(item)
broadcast(f"{action_actor.name} gives {character} the {item_name} item.")
action_actor.items.remove(item)
destination_actor.items.append(item)
return f"You give the {item_name} item to {character}."
return f"You give the {item_name} item to {character}."
def action_drop(item_name: str) -> str:
@ -205,14 +208,13 @@ def action_drop(item_name: str) -> str:
item_name: The name of the item to drop.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have the {item_name} item in your inventory."
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have the {item_name} item in your inventory."
broadcast(f"{action_actor.name} drops the {item_name} item")
action_actor.items.remove(item)
action_room.items.append(item)
broadcast(f"{action_actor.name} drops the {item_name} item")
action_actor.items.remove(item)
action_room.items.append(item)
return f"You drop the {item_name} item."
return f"You drop the {item_name} item."

View File

@ -33,14 +33,14 @@ from adventure.player import (
remove_player,
set_player,
)
from adventure.render_comfy import render_event
from adventure.render.comfy import render_event
logger = getLogger(__name__)
client = None
bot_config: DiscordBotConfig = DEFAULT_CONFIG.bot.discord
active_tasks = set()
event_messages: Dict[str, str | GameEvent] = {}
event_messages: Dict[int, str | GameEvent] = {}
event_queue: Queue[GameEvent] = Queue()
@ -81,13 +81,13 @@ class AdventureClient(Client):
channel = message.channel
user_name = author.name # include nick
world = get_current_world()
if world:
active_world = f"Active world: {world.name} (theme: {world.theme})"
else:
active_world = "No active world"
if message.content.startswith("!adventure"):
world = get_current_world()
if world:
active_world = f"Active world: {world.name} (theme: {world.theme})"
else:
active_world = "No active world"
await message.channel.send(f"Hello! Welcome to Adventure! {active_world}")
return
@ -215,7 +215,14 @@ def stop_bot():
global client
if client:
client.close()
close_task = client.loop.create_task(client.close())
active_tasks.add(close_task)
def on_close_task_done(future):
logger.info("discord client closed")
active_tasks.discard(future)
close_task.add_done_callback(on_close_task_done)
client = None
@ -299,7 +306,7 @@ async def broadcast_event(message: str | GameEvent):
event_messages[event_message.id] = message
def embed_from_event(event: GameEvent) -> Embed:
def embed_from_event(event: GameEvent) -> Embed | None:
if isinstance(event, GenerateEvent):
return embed_from_generate(event)
elif isinstance(event, ResultEvent):

View File

@ -37,14 +37,21 @@ game_systems: List[GameSystem] = []
# TODO: where should this one go?
actor_agents: Dict[str, Tuple[Actor, Agent]] = {}
STRING_EVENT_TYPE = "message"
def get_event_name(event: GameEvent | Type[GameEvent]):
return f"event:{event.type}"
def broadcast(message: str | GameEvent):
if isinstance(message, GameEvent):
logger.debug(f"broadcasting {message.type}")
event_emitter.emit(message.type, message)
event_name = get_event_name(message)
logger.debug(f"broadcasting {event_name}")
event_emitter.emit(event_name, message)
else:
logger.warning("broadcasting a string message is deprecated")
event_emitter.emit("message", message)
event_emitter.emit(STRING_EVENT_TYPE, message)
def is_union(type_: Type | UnionType):
@ -62,10 +69,13 @@ def subscribe(
return
if event_type is str:
event_name = STRING_EVENT_TYPE
else:
event_name = get_event_name(event_type)
logger.debug(f"subscribing {callback.__name__} to {event_type}")
event_emitter.on(
event_type.type, callback
) # TODO: should this use str or __name__?
event_emitter.on(event_name, callback)
def has_dungeon_master():
@ -74,11 +84,17 @@ def has_dungeon_master():
# region context manager
@contextmanager
def with_action_context():
def action_context():
room, actor = get_action_context()
yield room, actor
@contextmanager
def world_context():
world, room, actor = get_world_context()
yield world, room, actor
# endregion
@ -94,7 +110,7 @@ def get_action_context() -> Tuple[Room, Actor]:
return (current_room, current_actor)
def get_current_context() -> Tuple[World, Room, Actor]:
def get_world_context() -> Tuple[World, Room, Actor]:
if not current_world:
raise ValueError(
"The current world must be set before calling action functions"

View File

@ -246,19 +246,19 @@ def main():
threads = []
if args.render:
from adventure.render_comfy import launch_render, render_generated
from adventure.render.comfy import launch_render, render_generated
threads.extend(launch_render(config.render))
if args.render_generated:
subscribe(GenerateEvent, render_generated)
if args.discord:
from adventure.bot_discord import launch_bot
from adventure.bot.discord import launch_bot
threads.extend(launch_bot(config.bot.discord))
if args.server:
from adventure.server_socket import launch_server, server_system
from adventure.server.websocket import launch_server
threads.extend(launch_server(config.server.websocket))
@ -300,6 +300,8 @@ def main():
# make sure the server system runs after any updates
if args.server:
from adventure.server.websocket import server_system
extra_systems.append(GameSystem(simulate=server_system))
# load or generate the world

View File

@ -4,16 +4,17 @@ from typing import Callable, List
from packit.agent import Agent, agent_easy_connect
from adventure.context import (
action_context,
broadcast,
get_agent_for_actor,
get_current_context,
get_dungeon_master,
has_dungeon_master,
set_dungeon_master,
world_context,
)
from adventure.generate import OPPOSITE_DIRECTIONS, generate_item, generate_room
from adventure.search import find_actor_in_room
from adventure.utils.effect import apply_effect
from adventure.utils.search import find_actor_in_room
from adventure.utils.world import describe_actor, describe_entity
logger = getLogger(__name__)
@ -39,34 +40,31 @@ def action_explore(direction: str) -> str:
direction: The direction to explore: north, south, east, or west.
"""
current_world, current_room, current_actor = get_current_context()
dungeon_master = get_dungeon_master()
with world_context() as (action_world, action_room, action_actor):
dungeon_master = get_dungeon_master()
if not current_world:
raise ValueError("No world found")
if direction in action_room.portals:
dest_room = action_room.portals[direction]
return f"You cannot explore {direction} from here, that direction already leads to {dest_room}. Please use the move action to go there."
if direction in current_room.portals:
dest_room = current_room.portals[direction]
return f"You cannot explore {direction} from here, that direction already leads to {dest_room}. Please use the move action to go there."
existing_rooms = [room.name for room in action_world.rooms]
try:
new_room = generate_room(
dungeon_master, action_world.theme, existing_rooms=existing_rooms
)
action_world.rooms.append(new_room)
existing_rooms = [room.name for room in current_world.rooms]
try:
new_room = generate_room(
dungeon_master, current_world.theme, existing_rooms=existing_rooms
)
current_world.rooms.append(new_room)
# link the rooms together
action_room.portals[direction] = new_room.name
new_room.portals[OPPOSITE_DIRECTIONS[direction]] = action_room.name
# link the rooms together
current_room.portals[direction] = new_room.name
new_room.portals[OPPOSITE_DIRECTIONS[direction]] = current_room.name
broadcast(
f"{current_actor.name} explores {direction} of {current_room.name} and finds a new room: {new_room.name}"
)
return f"You explore {direction} and find a new room: {new_room.name}"
except Exception:
logger.exception("error generating room")
return f"You cannot explore {direction} from here, there is no room in that direction."
broadcast(
f"{action_actor.name} explores {direction} of {action_room.name} and finds a new room: {new_room.name}"
)
return f"You explore {direction} and find a new room: {new_room.name}"
except Exception:
logger.exception("error generating room")
return f"You cannot explore {direction} from here, there is no room in that direction."
def action_search(unused: bool) -> str:
@ -74,30 +72,32 @@ def action_search(unused: bool) -> str:
Search the room for hidden items.
"""
action_world, action_room, action_actor = get_current_context()
dungeon_master = get_dungeon_master()
with world_context() as (action_world, action_room, action_actor):
dungeon_master = get_dungeon_master()
if len(action_room.items) > 2:
return "You find nothing hidden in the room. There is no room for more items."
if len(action_room.items) > 2:
return (
"You find nothing hidden in the room. There is no room for more items."
)
existing_items = [item.name for item in action_room.items]
existing_items = [item.name for item in action_room.items]
try:
new_item = generate_item(
dungeon_master,
action_world.theme,
existing_items=existing_items,
dest_room=action_room.name,
)
action_room.items.append(new_item)
try:
new_item = generate_item(
dungeon_master,
action_world.theme,
existing_items=existing_items,
dest_room=action_room.name,
)
action_room.items.append(new_item)
broadcast(
f"{action_actor.name} searches {action_room.name} and finds a new item: {new_item.name}"
)
return f"You search the room and find a new item: {new_item.name}"
except Exception:
logger.exception("error generating item")
return "You find nothing hidden in the room."
broadcast(
f"{action_actor.name} searches {action_room.name} and finds a new item: {new_item.name}"
)
return f"You search the room and find a new item: {new_item.name}"
except Exception:
logger.exception("error generating item")
return "You find nothing hidden in the room."
def action_use(item: str, target: str) -> str:
@ -108,69 +108,69 @@ def action_use(item: str, target: str) -> str:
item: The name of the item to use.
target: The name of the character to use the item on, or "self" to use the item on yourself.
"""
_, action_room, action_actor = get_current_context()
dungeon_master = get_dungeon_master()
with action_context() as (action_room, action_actor):
dungeon_master = get_dungeon_master()
action_item = next(
(
search_item
for search_item in (action_actor.items + action_room.items)
if search_item.name == item
),
None,
)
if not action_item:
return f"The {item} item is not available to use."
action_item = next(
(
search_item
for search_item in (action_actor.items + action_room.items)
if search_item.name == item
),
None,
)
if not action_item:
return f"The {item} item is not available to use."
if target == "self":
target_actor = action_actor
target = action_actor.name
else:
target_actor = find_actor_in_room(action_room, target)
if not target_actor:
return f"The {target} character is not in the room."
if target == "self":
target_actor = action_actor
target = action_actor.name
else:
target_actor = find_actor_in_room(action_room, target)
if not target_actor:
return f"The {target} character is not in the room."
effect_names = [effect.name for effect in action_item.effects]
chosen_name = dungeon_master(
f"{action_actor.name} uses {item} on {target}. "
f"{item} has the following effects: {effect_names}. "
"Which effect should be applied? Specify the name of the effect to apply."
"Do not include the question or any JSON. Only include the name of the effect to apply."
)
chosen_name = chosen_name.strip()
effect_names = [effect.name for effect in action_item.effects]
chosen_name = dungeon_master(
f"{action_actor.name} uses {item} on {target}. "
f"{item} has the following effects: {effect_names}. "
"Which effect should be applied? Specify the name of the effect to apply."
"Do not include the question or any JSON. Only include the name of the effect to apply."
)
chosen_name = chosen_name.strip()
chosen_effect = next(
(
search_effect
for search_effect in action_item.effects
if search_effect.name == chosen_name
),
None,
)
if not chosen_effect:
# TODO: should retry the question if the effect is not found
return f"The {chosen_name} effect is not available to apply."
chosen_effect = next(
(
search_effect
for search_effect in action_item.effects
if search_effect.name == chosen_name
),
None,
)
if not chosen_effect:
# TODO: should retry the question if the effect is not found
return f"The {chosen_name} effect is not available to apply."
apply_effect(chosen_effect, target_actor.attributes)
apply_effect(chosen_effect, target_actor.attributes)
broadcast(
f"{action_actor.name} uses the {chosen_name} effect of {item} on {target}"
)
outcome = dungeon_master(
f"{action_actor.name} uses the {chosen_name} effect of {item} on {target}. "
f"{describe_actor(action_actor)}. {describe_actor(target_actor)}. {describe_entity(action_item)}. "
f"What happens? How does {target} react? Be creative with the results. The outcome can be good, bad, or neutral."
"Decide based on the characters involved and the item being used."
"Specify the outcome of the action. Do not include the question or any JSON. Only include the outcome of the action."
)
broadcast(f"The action resulted in: {outcome}")
broadcast(
f"{action_actor.name} uses the {chosen_name} effect of {item} on {target}"
)
outcome = dungeon_master(
f"{action_actor.name} uses the {chosen_name} effect of {item} on {target}. "
f"{describe_actor(action_actor)}. {describe_actor(target_actor)}. {describe_entity(action_item)}. "
f"What happens? How does {target} react? Be creative with the results. The outcome can be good, bad, or neutral."
"Decide based on the characters involved and the item being used."
"Specify the outcome of the action. Do not include the question or any JSON. Only include the outcome of the action."
)
broadcast(f"The action resulted in: {outcome}")
# make sure both agents remember the outcome
target_agent = get_agent_for_actor(target_actor)
if target_agent:
target_agent.memory.append(outcome)
# make sure both agents remember the outcome
target_agent = get_agent_for_actor(target_actor)
if target_agent and target_agent.memory:
target_agent.memory.append(outcome)
return outcome
return outcome
def init() -> List[Callable]:

View File

@ -8,7 +8,7 @@ from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from packit.agent import Agent
from packit.utils import could_be_json
from adventure.context import get_current_context
from adventure.context import action_context
from adventure.models.event import PromptEvent
logger = getLogger(__name__)
@ -183,21 +183,21 @@ class RemotePlayer(BasePlayer):
formatted_prompt = prompt.format(**kwargs)
self.memory.append(HumanMessage(content=formatted_prompt))
_, current_room, current_actor = get_current_context()
prompt_event = PromptEvent(
prompt=formatted_prompt, room=current_room, actor=current_actor
)
with action_context() as (current_room, current_actor):
prompt_event = PromptEvent(
prompt=formatted_prompt, room=current_room, actor=current_actor
)
try:
logger.info(f"prompting remote player: {self.name}")
if self.send_prompt(prompt_event):
reply = self.input_queue.get(timeout=60)
logger.info(f"got reply from remote player: {reply}")
return self.parse_input(reply)
except Exception:
logger.exception("error getting reply from remote player")
try:
logger.info(f"prompting remote player: {self.name}")
if self.send_prompt(prompt_event):
reply = self.input_queue.get(timeout=60)
logger.info(f"got reply from remote player: {reply}")
return self.parse_input(reply)
except Exception:
logger.exception("error getting reply from remote player")
if self.fallback_agent:
return self.fallback_agent(prompt, **kwargs)
if self.fallback_agent:
return self.fallback_agent(prompt, **kwargs)
return ""
return ""

View File

@ -1,6 +1,6 @@
from random import randint
from adventure.context import broadcast, get_current_context, get_dungeon_master
from adventure.context import broadcast, get_dungeon_master, world_context
from adventure.generate import generate_item
from adventure.models.base import dataclass
from adventure.models.entity import Item
@ -26,46 +26,49 @@ def action_craft(item_name: str) -> str:
Args:
item_name: The name of the item to craft.
"""
action_world, _, action_actor = get_current_context()
with world_context() as (action_world, _, action_actor):
if item_name not in recipes:
return f"There is no recipe to craft a {item_name}."
if item_name not in recipes:
return f"There is no recipe to craft a {item_name}."
recipe = recipes[item_name]
recipe = recipes[item_name]
# Check if the actor has the required skill level
skill = randint(1, 20)
if skill < recipe.difficulty:
return f"You need a crafting skill level of {recipe.difficulty} to craft {item_name}."
# Check if the actor has the required skill level
skill = randint(1, 20)
if skill < recipe.difficulty:
return f"You need a crafting skill level of {recipe.difficulty} to craft {item_name}."
# Collect inventory items names
inventory_items = {item.name for item in action_actor.items}
# Collect inventory items names
inventory_items = {item.name for item in action_actor.items}
# Check for sufficient ingredients
missing_items = [
item for item in recipe.ingredients if item not in inventory_items
]
if missing_items:
return (
f"You are missing {' and '.join(missing_items)} to craft {item_name}."
)
# Check for sufficient ingredients
missing_items = [item for item in recipe.ingredients if item not in inventory_items]
if missing_items:
return f"You are missing {' and '.join(missing_items)} to craft {item_name}."
# Deduct the ingredients from inventory
for ingredient in recipe.ingredients:
item_to_remove = next(
item for item in action_actor.items if item.name == ingredient
)
action_actor.items.remove(item_to_remove)
# Deduct the ingredients from inventory
for ingredient in recipe.ingredients:
item_to_remove = next(
item for item in action_actor.items if item.name == ingredient
# Create and add the crafted item to inventory
result_item = next(
(item for item in action_actor.items if item.name == recipe.result), None
)
action_actor.items.remove(item_to_remove)
if result_item:
new_item = Item(**vars(result_item)) # Copying the item
else:
dungeon_master = get_dungeon_master()
new_item = generate_item(
dungeon_master, action_world.theme
) # TODO: pass recipe item
# Create and add the crafted item to inventory
result_item = next(
(item for item in action_actor.items if item.name == recipe.result), None
)
if result_item:
new_item = Item(**vars(result_item)) # Copying the item
else:
dungeon_master = get_dungeon_master()
new_item = generate_item(
dungeon_master, action_world.theme
) # TODO: pass recipe item
action_actor.items.append(new_item)
action_actor.items.append(new_item)
broadcast(f"{action_actor.name} crafts a {item_name}.")
return f"You successfully craft a {item_name}."
broadcast(f"{action_actor.name} crafts a {item_name}.")
return f"You successfully craft a {item_name}."

View File

@ -1,5 +1,5 @@
from adventure.context import broadcast, get_current_context
from adventure.search import find_item_in_actor
from adventure.context import action_context, broadcast
from adventure.utils.search import find_item_in_actor
def action_read(item_name: str) -> str:
@ -9,14 +9,13 @@ def action_read(item_name: str) -> str:
Args:
item_name: The name of the item to read.
"""
_, _, action_actor = get_current_context()
with action_context() as (_, action_actor):
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have a {item_name} to read."
item = find_item_in_actor(action_actor, item_name)
if not item:
return f"You do not have a {item_name} to read."
if "text" in item.attributes:
broadcast(f"{action_actor.name} reads {item_name}")
return str(item.attributes["text"])
if "text" in item.attributes:
broadcast(f"{action_actor.name} reads {item_name}")
return str(item.attributes["text"])
else:
return f"The {item_name} has nothing to read."

View File

@ -1,7 +1,7 @@
from random import randint
from adventure.context import broadcast, get_current_context, get_dungeon_master
from adventure.search import find_actor_in_room
from adventure.context import action_context, broadcast, get_dungeon_master
from adventure.utils.search import find_actor_in_room
def action_cast(spell: str, target: str) -> str:
@ -12,26 +12,25 @@ def action_cast(spell: str, target: str) -> str:
spell: The name of the spell to cast.
target: The target of the spell.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
target_actor = find_actor_in_room(action_room, target)
dungeon_master = get_dungeon_master()
target_actor = find_actor_in_room(action_room, target)
dungeon_master = get_dungeon_master()
# Check for spell availability and mana costs
if spell not in action_actor.attributes["spells"]:
return f"You do not know the spell '{spell}'."
if action_actor.attributes["mana"] < action_actor.attributes["spells"][spell]:
return "You do not have enough mana to cast this spell."
# Check for spell availability and mana costs
if spell not in action_actor.attributes["spells"]:
return f"You do not know the spell '{spell}'."
if action_actor.attributes["mana"] < action_actor.attributes["spells"][spell]:
return "You do not have enough mana to cast this spell."
action_actor.attributes["mana"] -= action_actor.attributes["spells"][spell]
# Get flavor text from the dungeon master
flavor_text = dungeon_master(f"Describe the effects of {spell} on {target}.")
broadcast(f"{action_actor.name} casts {spell} on {target}. {flavor_text}")
action_actor.attributes["mana"] -= action_actor.attributes["spells"][spell]
# Get flavor text from the dungeon master
flavor_text = dungeon_master(f"Describe the effects of {spell} on {target}.")
broadcast(f"{action_actor.name} casts {spell} on {target}. {flavor_text}")
# Apply effects based on the spell
if spell == "heal" and target_actor:
heal_amount = randint(10, 30)
target_actor.attributes["health"] += heal_amount
return f"{target} is healed for {heal_amount} points."
# Apply effects based on the spell
if spell == "heal" and target_actor:
heal_amount = randint(10, 30)
target_actor.attributes["health"] += heal_amount
return f"{target} is healed for {heal_amount} points."
return f"{spell} was successfully cast on {target}."
return f"{spell} was successfully cast on {target}."

View File

@ -1,7 +1,7 @@
from random import randint
from adventure.context import broadcast, get_current_context, get_dungeon_master
from adventure.search import find_item_in_room
from adventure.context import action_context, broadcast, get_dungeon_master
from adventure.utils.search import find_item_in_room
def action_climb(target: str) -> str:
@ -11,27 +11,28 @@ def action_climb(target: str) -> str:
Args:
target: The object or feature to climb.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
dungeon_master = get_dungeon_master()
# Assume 'climbable' is an attribute that marks climbable targets
climbable_feature = find_item_in_room(action_room, target)
dungeon_master = get_dungeon_master()
# Assume 'climbable' is an attribute that marks climbable targets
climbable_feature = find_item_in_room(action_room, target)
if climbable_feature and climbable_feature.attributes.get("climbable", False):
climb_difficulty = int(climbable_feature.attributes.get("difficulty", 5))
climb_roll = randint(1, 20)
if climbable_feature and climbable_feature.attributes.get("climbable", False):
climb_difficulty = int(climbable_feature.attributes.get("difficulty", 5))
climb_roll = randint(1, 20)
# Get flavor text for the climb attempt
flavor_text = dungeon_master(
f"Describe {action_actor.name}'s attempt to climb {target}."
)
if climb_roll > climb_difficulty:
broadcast(
f"{action_actor.name} successfully climbs the {target}. {flavor_text}"
# Get flavor text for the climb attempt
flavor_text = dungeon_master(
f"Describe {action_actor.name}'s attempt to climb {target}."
)
return f"You successfully climb the {target}."
if climb_roll > climb_difficulty:
broadcast(
f"{action_actor.name} successfully climbs the {target}. {flavor_text}"
)
return f"You successfully climb the {target}."
else:
broadcast(
f"{action_actor.name} fails to climb the {target}. {flavor_text}"
)
return f"You fail to climb the {target}."
else:
broadcast(f"{action_actor.name} fails to climb the {target}. {flavor_text}")
return f"You fail to climb the {target}."
else:
return f"The {target} is not climbable."
return f"The {target} is not climbable."

View File

@ -36,9 +36,9 @@ from adventure.player import (
remove_player,
set_player,
)
from adventure.render_comfy import render_entity, render_event
from adventure.search import find_actor, find_item, find_room
from adventure.render.comfy import render_entity, render_event
from adventure.state import snapshot_world, world_json
from adventure.utils.search import find_actor, find_item, find_room
logger = getLogger(__name__)
@ -233,7 +233,7 @@ def render_input(data):
elif "item" in data:
item_name = data["item"]
item = find_item(
world, item_name, include_actor_inventory=True, include_room_inventory=True
world, item_name, include_actor_inventory=True, include_item_inventory=True
)
if item:
render_entity(item)

View File

@ -1,10 +1,10 @@
from adventure.context import (
action_context,
broadcast,
get_agent_for_actor,
get_current_context,
get_dungeon_master,
)
from adventure.search import find_actor_in_room, find_item_in_room
from adventure.utils.search import find_actor_in_room, find_item_in_room
from adventure.utils.world import describe_entity
@ -16,46 +16,46 @@ def action_attack(target: str) -> str:
target: The name of the character or item to attack.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
# make sure the target is in the room
target_actor = find_actor_in_room(action_room, target)
target_item = find_item_in_room(action_room, target)
# make sure the target is in the room
target_actor = find_actor_in_room(action_room, target)
target_item = find_item_in_room(action_room, target)
dungeon_master = get_dungeon_master()
if target_actor:
target_agent = get_agent_for_actor(target_actor)
if not target_agent:
raise ValueError(f"no agent found for actor {target_actor.name}")
dungeon_master = get_dungeon_master()
if target_actor:
target_agent = get_agent_for_actor(target_actor)
if not target_agent:
raise ValueError(f"no agent found for actor {target_actor.name}")
reaction = target_agent(
f"{action_actor.name} is attacking you in the {action_room.name}. How do you react?"
"Respond with 'fighting', 'fleeing', or 'surrendering'."
)
reaction = target_agent(
f"{action_actor.name} is attacking you in the {action_room.name}. How do you react?"
"Respond with 'fighting', 'fleeing', or 'surrendering'."
)
outcome = dungeon_master(
f"{action_actor.name} attacks {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_actor)}."
f"{target} reacts by {reaction}. What is the outcome of the attack? Describe the result in detail."
)
outcome = dungeon_master(
f"{action_actor.name} attacks {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_actor)}."
f"{target} reacts by {reaction}. What is the outcome of the attack? Describe the result in detail."
)
description = (
f"{action_actor.name} attacks the {target} in the {action_room.name}."
f"{target} reacts by {reaction}. {outcome}"
)
broadcast(description)
return description
description = (
f"{action_actor.name} attacks the {target} in the {action_room.name}."
f"{target} reacts by {reaction}. {outcome}"
)
broadcast(description)
return description
elif target_item:
outcome = dungeon_master(
f"{action_actor.name} attacks {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_item)}."
f"What is the outcome of the attack? Describe the result in detail."
)
if target_item:
outcome = dungeon_master(
f"{action_actor.name} attacks {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_item)}."
f"What is the outcome of the attack? Describe the result in detail."
)
description = f"{action_actor.name} attacks the {target} in the {action_room.name}. {outcome}"
broadcast(description)
return description
description = f"{action_actor.name} attacks the {target} in the {action_room.name}. {outcome}"
broadcast(description)
return description
else:
return f"{target} is not in the {action_room.name}."
@ -68,22 +68,21 @@ def action_cast(target: str, spell: str) -> str:
spell: The name of the spell to cast.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
# make sure the target is in the room
target_actor = find_actor_in_room(action_room, target)
target_item = find_item_in_room(action_room, target)
# make sure the target is in the room
target_actor = find_actor_in_room(action_room, target)
target_item = find_item_in_room(action_room, target)
if not target_actor and not target_item:
return f"{target} is not in the {action_room.name}."
if not target_actor and not target_item:
return f"{target} is not in the {action_room.name}."
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} casts {spell} on {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_actor) if target_actor else describe_entity(target_item)}."
f"What is the outcome of the spell? Describe the result in detail."
)
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} casts {spell} on {target} in the {action_room.name}. {describe_entity(action_room)}."
f"{describe_entity(action_actor)}. {describe_entity(target_actor) if target_actor else describe_entity(target_item)}."
f"What is the outcome of the spell? Describe the result in detail."
)
description = f"{action_actor.name} casts {spell} on the {target} in the {action_room.name}. {outcome}"
broadcast(description)
return description
description = f"{action_actor.name} casts {spell} on the {target} in the {action_room.name}. {outcome}"
broadcast(description)
return description

View File

@ -1,5 +1,5 @@
from adventure.context import get_current_context
from adventure.search import find_item_in_actor
from adventure.context import action_context
from adventure.utils.search import find_item_in_actor
def action_cook(item: str) -> str:
@ -9,25 +9,24 @@ def action_cook(item: str) -> str:
Args:
item: The name of the item to cook.
"""
_, _, action_actor = get_current_context()
with action_context() as (_, action_actor):
target_item = find_item_in_actor(action_actor, item)
if target_item is None:
return "You don't have the item to cook."
target_item = find_item_in_actor(action_actor, item)
if target_item is None:
return "You don't have the item to cook."
# Check if the item is edible
edible = target_item.attributes.get("edible", False)
if not edible:
return "You can't cook that."
# Check if the item is edible
edible = target_item.attributes.get("edible", False)
if not edible:
return "You can't cook that."
# Check if the item is raw
cooked = target_item.attributes.get("cooked", False)
if cooked:
return "That item is already cooked."
# Check if the item is raw
cooked = target_item.attributes.get("cooked", False)
if cooked:
return "That item is already cooked."
# Cook the item
target_item.attributes["cooked"] = True
return f"You cook the {item}."
# Cook the item
target_item.attributes["cooked"] = True
return f"You cook the {item}."
def action_eat(item: str) -> str:
@ -37,33 +36,32 @@ def action_eat(item: str) -> str:
Args:
item: The name of the item to eat.
"""
_, _, action_actor = get_current_context()
with action_context() as (_, action_actor):
target_item = find_item_in_actor(action_actor, item)
if target_item is None:
return "You don't have the item to eat."
target_item = find_item_in_actor(action_actor, item)
if target_item is None:
return "You don't have the item to eat."
# Check if the item is edible
edible = target_item.attributes.get("edible", False)
if not edible:
return "You can't eat that."
# Check if the item is edible
edible = target_item.attributes.get("edible", False)
if not edible:
return "You can't eat that."
# Check if the item is cooked
cooked = target_item.attributes.get("cooked", False)
if not cooked:
return "You can't eat that raw."
# Check if the item is cooked
cooked = target_item.attributes.get("cooked", False)
if not cooked:
return "You can't eat that raw."
# Check if the item is rotten
spoiled = target_item.attributes.get("spoiled", False)
if spoiled:
return "You can't eat that item, it is rotten."
# Check if the item is rotten
spoiled = target_item.attributes.get("spoiled", False)
if spoiled:
return "You can't eat that item, it is rotten."
# Check if the actor is hungry
hunger = action_actor.attributes.get("hunger", None)
if hunger != "hungry":
return "You're not hungry."
# Check if the actor is hungry
hunger = action_actor.attributes.get("hunger", None)
if hunger != "hungry":
return "You're not hungry."
# Eat the item
action_actor.items.remove(target_item)
action_actor.attributes["hunger"] = "full"
return f"You eat the {item}."
# Eat the item
action_actor.items.remove(target_item)
action_actor.attributes["hunger"] = "full"
return f"You eat the {item}."

View File

@ -1,4 +1,4 @@
from adventure.context import get_current_context, get_dungeon_master
from adventure.context import action_context, get_dungeon_master
from adventure.utils.world import describe_entity
@ -7,15 +7,15 @@ def action_wash(unused: bool) -> str:
Wash yourself.
"""
_, action_room, action_actor = get_current_context()
hygiene = action_actor.attributes.get("hygiene", "clean")
with action_context() as (action_room, action_actor):
hygiene = action_actor.attributes.get("hygiene", "clean")
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} washes themselves in the {action_room.name}. {describe_entity(action_room)}. {describe_entity(action_actor)}"
f"{action_actor.name} was {hygiene} to start with. How clean are they after washing? Respond with 'clean' or 'dirty'."
"If the room has a shower or running water, they should be cleaner. If the room is dirty, they should end up dirtier."
)
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} washes themselves in the {action_room.name}. {describe_entity(action_room)}. {describe_entity(action_actor)}"
f"{action_actor.name} was {hygiene} to start with. How clean are they after washing? Respond with 'clean' or 'dirty'."
"If the room has a shower or running water, they should be cleaner. If the room is dirty, they should end up dirtier."
)
action_actor.attributes["clean"] = outcome.strip().lower()
return f"You wash yourself in the {action_room.name} and feel {outcome}"
action_actor.attributes["clean"] = outcome.strip().lower()
return f"You wash yourself in the {action_room.name} and feel {outcome}"

View File

@ -1,4 +1,4 @@
from adventure.context import get_current_context, get_dungeon_master
from adventure.context import action_context, get_dungeon_master
from adventure.utils.world import describe_entity
@ -7,13 +7,12 @@ def action_sleep(unused: bool) -> str:
Sleep until you are rested.
"""
_, action_room, action_actor = get_current_context()
with action_context() as (action_room, action_actor):
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} sleeps in the {action_room.name}. {describe_entity(action_room)}. {describe_entity(action_actor)}"
"How rested are they? Respond with 'rested' or 'tired'."
)
dungeon_master = get_dungeon_master()
outcome = dungeon_master(
f"{action_actor.name} sleeps in the {action_room.name}. {describe_entity(action_room)}. {describe_entity(action_actor)}"
"How rested are they? Respond with 'rested' or 'tired'."
)
action_actor.attributes["rested"] = outcome
return f"You sleep in the {action_room.name} and wake up feeling {outcome}"
action_actor.attributes["rested"] = outcome
return f"You sleep in the {action_room.name} and wake up feeling {outcome}"

View File

@ -146,7 +146,8 @@ def simulate_world(
)
logger.debug(f"{actor.name} step result: {result}")
agent.memory.append(result)
if agent.memory:
agent.memory.append(result)
result_event = ResultEvent(result=result, room=room, actor=actor)
broadcast(result_event)

View File

@ -54,7 +54,7 @@ def snapshot_world(world: World, step: int):
json_memory = {}
for actor, agent in get_all_actor_agents():
json_memory[actor.name] = list(agent.memory)
json_memory[actor.name] = list(agent.memory or [])
return {
"world": json_world,