1
0
Fork 0
taleweave-ai/taleweave/simulate.py

335 lines
11 KiB
Python
Raw Normal View History

from functools import partial
from itertools import count
from json import loads
2024-05-09 02:11:16 +00:00
from logging import getLogger
from math import inf
2024-05-18 21:58:11 +00:00
from typing import Callable, Sequence
2024-05-09 02:11:16 +00:00
from packit.agent import Agent
from packit.conditions import condition_or, condition_threshold
from packit.loops import loop_retry
2024-05-09 02:11:16 +00:00
from packit.results import multi_function_or_str_result
from packit.toolbox import Toolbox
from packit.utils import could_be_json
from taleweave.actions.base import (
2024-05-09 02:11:16 +00:00
action_ask,
action_give,
action_look,
action_move,
action_take,
action_tell,
)
from taleweave.actions.planning import (
check_calendar,
erase_notes,
get_recent_notes,
read_notes,
replace_note,
schedule_event,
summarize_notes,
take_note,
)
from taleweave.context import (
2024-05-18 21:58:11 +00:00
broadcast,
get_character_agent_for_name,
get_character_for_agent,
2024-05-27 12:54:36 +00:00
get_current_turn,
2024-05-09 02:11:16 +00:00
get_current_world,
set_current_character,
2024-05-09 02:11:16 +00:00
set_current_room,
2024-05-27 12:54:36 +00:00
set_current_turn,
2024-05-09 02:11:16 +00:00
set_current_world,
set_game_systems,
2024-05-09 02:11:16 +00:00
)
from taleweave.game_system import GameSystem
from taleweave.models.config import DEFAULT_CONFIG
from taleweave.models.entity import Character, Room, World
from taleweave.models.event import ActionEvent, ReplyEvent, ResultEvent
from taleweave.utils.conversation import make_keyword_condition, summarize_room
from taleweave.utils.effect import expire_effects
from taleweave.utils.planning import expire_events, get_upcoming_events
from taleweave.utils.search import find_room_with_character
from taleweave.utils.world import describe_entity, format_attributes
2024-05-09 02:11:16 +00:00
logger = getLogger(__name__)
2024-05-27 12:54:36 +00:00
turn_config = DEFAULT_CONFIG.world.turn
2024-05-09 02:11:16 +00:00
def world_result_parser(value, agent, **kwargs):
current_world = get_current_world()
if not current_world:
raise ValueError(
"The current world must be set before calling world_result_parser"
)
logger.debug(f"parsing action for {agent.name}: {value}")
current_character = get_character_for_agent(agent)
2024-05-09 02:11:16 +00:00
current_room = next(
(room for room in current_world.rooms if current_character in room.characters),
None,
2024-05-09 02:11:16 +00:00
)
set_current_room(current_room)
set_current_character(current_character)
2024-05-09 02:11:16 +00:00
return multi_function_or_str_result(value, agent=agent, **kwargs)
def prompt_character_action(
room, character, agent, action_names, action_toolbox, current_turn
) -> str:
# collect data for the prompt
notes_prompt, events_prompt = get_notes_events(character, current_turn)
room_characters = [character.name for character in room.characters]
room_items = [item.name for item in room.items]
room_directions = [portal.name for portal in room.portals]
character_attributes = format_attributes(character)
# character_effects = [effect.name for effect in character.active_effects]
character_items = [item.name for item in character.items]
# set up a result parser for the agent
def result_parser(value, agent, **kwargs):
if not room or not character:
raise ValueError("Room and character must be set before parsing results")
# trim suffixes that are used elsewhere
value = value.removesuffix("END").strip()
# fix unbalanced curly braces
if value.startswith("{") and not value.endswith("}"):
open_count = value.count("{")
close_count = value.count("}")
if open_count > close_count:
fixed_value = value + ("}" * (open_count - close_count))
try:
loads(fixed_value)
value = fixed_value
except Exception:
pass
if could_be_json(value):
event = ActionEvent.from_json(value, room, character)
else:
event = ReplyEvent.from_text(value, room, character)
broadcast(event)
return world_result_parser(value, agent, **kwargs)
# prompt and act
logger.info("starting turn for character: %s", character.name)
result = loop_retry(
agent,
(
"You are currently in the {room_name} room. {room_description}. {attributes}. "
"The room contains the following characters: {visible_characters}. "
"The room contains the following items: {visible_items}. "
"Your inventory contains the following items: {character_items}."
"You can take the following actions: {actions}. "
"You can move in the following directions: {directions}. "
"{notes_prompt} {events_prompt}"
"What will you do next? Reply with a JSON function call, calling one of the actions."
"You can only perform one action per turn. What is your next action?"
),
context={
"actions": action_names,
"character_items": character_items,
"attributes": character_attributes,
"directions": room_directions,
"room_name": room.name,
"room_description": describe_entity(room),
"visible_characters": room_characters,
"visible_items": room_items,
"notes_prompt": notes_prompt,
"events_prompt": events_prompt,
},
result_parser=result_parser,
toolbox=action_toolbox,
)
2024-05-27 12:54:36 +00:00
logger.debug(f"{character.name} action result: {result}")
if agent.memory:
# TODO: make sure this is not duplicating memories and wasting space
agent.memory.append(result)
return result
def get_notes_events(character: Character, current_turn: int):
recent_notes = get_recent_notes(character)
upcoming_events = get_upcoming_events(character, current_turn)
if len(recent_notes) > 0:
notes = "\n".join(recent_notes)
notes_prompt = f"Your recent notes are: {notes}\n"
else:
notes_prompt = "You have no recent notes.\n"
if len(upcoming_events) > 0:
2024-05-27 12:54:36 +00:00
current_turn = get_current_turn()
events = [
2024-05-27 12:54:36 +00:00
f"{event.name} in {event.turn - current_turn} turns"
for event in upcoming_events
]
events = "\n".join(events)
events_prompt = f"Upcoming events are: {events}\n"
else:
events_prompt = "You have no upcoming events.\n"
return notes_prompt, events_prompt
def prompt_character_think(
room: Room,
character: Character,
agent: Agent,
planner_toolbox: Toolbox,
current_turn: int,
max_steps: int | None = None,
) -> str:
2024-05-27 12:54:36 +00:00
max_steps = max_steps or turn_config.planning_steps
notes_prompt, events_prompt = get_notes_events(character, current_turn)
event_count = len(character.planner.calendar.events)
note_count = len(character.planner.notes)
logger.info("starting planning for character: %s", character.name)
_, condition_end, result_parser = make_keyword_condition("You are done planning.")
stop_condition = condition_or(
condition_end, partial(condition_threshold, max=max_steps)
)
i = 0
while not stop_condition(current=i):
result = loop_retry(
agent,
"You are about to start your turn. Plan your next action carefully. Take notes and schedule events to help keep track of your goals. "
"You can check your notes for important facts or check your calendar for upcoming events. You have {note_count} notes. "
"If you have plans with other characters, schedule them on your calendar. You have {event_count} events on your calendar. "
"{room_summary}"
"Think about your goals and any quests that you are working on, and plan your next action accordingly. "
"Try to keep your notes accurate and up-to-date. Replace or erase old notes when they are no longer accurate or useful. "
"Do not keeps notes about upcoming events, use your calendar for that. "
"You can perform up to 3 planning actions in a single turn. When you are done planning, reply with 'END'."
"{notes_prompt} {events_prompt}",
context={
"event_count": event_count,
"events_prompt": events_prompt,
"note_count": note_count,
"notes_prompt": notes_prompt,
"room_summary": summarize_room(room, character),
},
result_parser=result_parser,
stop_condition=stop_condition,
toolbox=planner_toolbox,
)
if agent.memory:
agent.memory.append(result)
i += 1
return result
2024-05-09 02:11:16 +00:00
def simulate_world(
world: World,
2024-05-27 12:54:36 +00:00
turns: float | int = inf,
2024-05-09 02:11:16 +00:00
actions: Sequence[Callable[..., str]] = [],
systems: Sequence[GameSystem] = [],
2024-05-09 02:11:16 +00:00
):
logger.info("simulating the world")
2024-05-09 02:11:16 +00:00
set_current_world(world)
set_game_systems(systems)
2024-05-09 02:11:16 +00:00
# build a toolbox for the actions
action_tools = Toolbox(
[
action_ask,
action_give,
action_look,
action_move,
action_take,
action_tell,
*actions,
]
)
action_names = action_tools.list_tools()
# build a toolbox for the planners
planner_toolbox = Toolbox(
[
check_calendar,
erase_notes,
read_notes,
replace_note,
schedule_event,
summarize_notes,
take_note,
]
)
# simulate each character
for i in count():
2024-05-27 12:54:36 +00:00
current_turn = get_current_turn()
logger.info(f"simulating turn {i} of {turns} (world turn {current_turn})")
for character_name in world.order:
character, agent = get_character_agent_for_name(character_name)
if not agent or not character:
logger.error(f"agent or character not found for name {character_name}")
2024-05-09 02:11:16 +00:00
continue
room = find_room_with_character(world, character)
2024-05-09 02:11:16 +00:00
if not room:
logger.error(f"character {character_name} is not in a room")
2024-05-09 02:11:16 +00:00
continue
# prep context
set_current_room(room)
set_current_character(character)
# decrement effects on the character and remove any that have expired
expire_effects(character)
2024-05-27 12:54:36 +00:00
expire_events(character, current_turn)
# give the character a chance to think and check their planner
if agent.memory and len(agent.memory) > 0:
try:
thoughts = prompt_character_think(
2024-05-27 12:54:36 +00:00
room, character, agent, planner_toolbox, current_turn
)
logger.debug(f"{character.name} thinks: {thoughts}")
except Exception:
logger.exception(
f"error during planning for character {character.name}"
)
2024-05-09 02:11:16 +00:00
2024-05-27 12:54:36 +00:00
try:
result = prompt_character_action(
room, character, agent, action_names, action_tools, current_turn
)
result_event = ResultEvent(
result=result, room=room, character=character
)
broadcast(result_event)
except Exception:
logger.exception(f"error during action for character {character.name}")
2024-05-09 02:11:16 +00:00
for system in systems:
if system.simulate:
2024-05-27 12:54:36 +00:00
system.simulate(world, current_turn)
2024-05-09 02:11:16 +00:00
2024-05-27 12:54:36 +00:00
set_current_turn(current_turn + 1)
if i >= turns:
logger.info("reached turn limit at world turn %s", current_turn + 1)
break