Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| import importlib | |
| import os | |
| import pathlib | |
| import queue | |
| import random | |
| import threading | |
| import time | |
| from filelock import FileLock | |
| import pygame | |
| from agent.game_agent import game_agent | |
| from provider.OpenAIProvider import OpenAIProvider | |
| from utils.game_utils import seed_everything | |
| from utils.calculate_log import calculate_statistics, extract_scores | |
| from utils.config import Config | |
| from utils.encoding_utils import encode_data_to_base64_path | |
| from utils.file_utils import assemble_project_path, get_all_files, img_to_gif, run_path_construct | |
| from utils.json_utils import parse_semi_formatted_text | |
| from utils.lmm_utils import assemble_prompt | |
| from utils.planner_utils import _extract_keys_from_template | |
| import logging | |
| import pickle | |
| config = Config() | |
| class PlaygroundPipelineRunner(): | |
| def __init__(self, game, level, history_steps, action_file, game_id, stop_event, params=None): | |
| print("PlaygroundPipelineRunner init.") | |
| # TODO | |
| self.output_dir = os.path.join(".", "runs", game_id) | |
| if not os.path.exists(self.output_dir): | |
| os.makedirs(self.output_dir) | |
| games = ["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"] | |
| gamename_to_envname = { | |
| "RaceGame": "race", | |
| "SuperMario": "mario", | |
| "FlappyBird": "flappybird", | |
| "TempestRun": "tempestrun", | |
| "PongGame": "pong" | |
| } | |
| gamename_to_levelname = { | |
| "RaceGame": "racegame", | |
| "SuperMario": "supermariogame", | |
| "FlappyBird": "flappybirdgame", | |
| "TempestRun": "tempestrungame", | |
| "PongGame": "ponggame" | |
| } | |
| # TODO history steps | |
| gameEnvConfig = f"config/env_config/env_config_{gamename_to_envname[game]}_reasoning_3steps.json" | |
| levelConfig = f"config/level_config/{gamename_to_levelname[game]}/level{level}.json" | |
| config.load_env_config(gameEnvConfig) | |
| config.load_level_config(levelConfig) | |
| self.env_name = config.env_name | |
| self.game_module = config.game_module | |
| self.game_class = config.game_class | |
| self.new_input_event = threading.Event() | |
| self.new_input_event.clear() | |
| self.history_images = [] | |
| self.current_image = None | |
| self.history_actions = [] | |
| self.game_id = game_id | |
| self.action_file = action_file | |
| self.stop_event = stop_event | |
| self.save_file = f"{self.output_dir}/game_{self.game_id}.pkl" | |
| self.step_signal_file = f"{self.output_dir}/step_signal.txt" | |
| if params is not None: | |
| print(f"Pipeline runner loaded params: {params}") | |
| self.provider = OpenAIProvider(params) | |
| self.agent = game_agent(self.provider) | |
| else: | |
| self.provider = None | |
| self.agent = game_agent() | |
| def input_listener(self, event): | |
| count = 0 | |
| flag = True | |
| while not event.is_set() and not self.game.over and not self.stop_event.is_set(): | |
| if not self.game.new_action_event.is_set(): | |
| if flag: | |
| game_info = self.game.get_game_info() | |
| self.agent.update_game_info(game_info) | |
| self.history_images = [ | |
| x['image'] for x in self.agent.history[-4:-1] | |
| ] | |
| self.current_image = self.agent.history[-1]['image'] | |
| self.history_actions = [ | |
| x['history_action'] for x in self.agent.history[-4:-1] | |
| ] | |
| info = { | |
| "history_images": self.history_images, | |
| "current_image": self.current_image, | |
| "history_actions": self.history_actions | |
| } | |
| self.last_info = info | |
| print("In runner:", info["history_actions"]) | |
| lock = FileLock(self.save_file + ".lock") | |
| with lock: | |
| pickle.dump(info, open(self.save_file, "wb")) | |
| flag = False | |
| if self.provider: | |
| if not os.path.exists(self.step_signal_file): | |
| continue | |
| with open(self.step_signal_file, "r") as f: | |
| step_signal = f.read() | |
| if step_signal == "step": | |
| with open(self.step_signal_file, "w") as f: | |
| f.write("") | |
| success, msg = self.agent.execute_action() | |
| if success: | |
| self.game.current_action = msg | |
| flag = True | |
| else: | |
| error_msg = msg | |
| info = self.last_info | |
| info["Error"] = error_msg | |
| lock = FileLock(self.save_file + ".lock") | |
| with lock: | |
| pickle.dump(info, open(self.save_file, "wb")) | |
| time.sleep(0.5) | |
| continue | |
| elif step_signal == "continuous": | |
| # TODO | |
| pass | |
| else: | |
| continue | |
| else: | |
| if not os.path.exists(self.action_file): | |
| continue | |
| with open(self.action_file, "r") as f: | |
| self.game.current_action = f.read() | |
| if self.game.current_action == "": | |
| continue | |
| with open(self.action_file, "w") as f: | |
| f.write("") | |
| flag = True | |
| print("Input listener get action: ", self.game.current_action) | |
| self.game.new_action_event.set() | |
| else: | |
| count += 1 | |
| if count % 5 == 0: | |
| print("Input listener waiting for event.") | |
| time.sleep(0.02) | |
| print("Input listener exit 0.") | |
| print("self.game.over: ", self.game.over) | |
| self.game.over = True | |
| def run(self): | |
| print(f"{self.env_name} Playground Pipeline Running.") | |
| game_module = importlib.import_module(self.game_module) | |
| game_class = getattr(game_module, self.game_class) | |
| self.game = game_class(self.output_dir) | |
| self.game.run(self.input_listener) | |
| def pipeline_shutdown(self): | |
| self.agent = None |