Spaces:
Sleeping
Sleeping
| from abc import ABC, abstractmethod | |
| import logging | |
| import random | |
| import threading | |
| import time | |
| import pygame | |
| from utils.config import Config | |
| from utils.game_utils import capture | |
| config = Config() | |
| class PygameBase(ABC): | |
| def __init__(self, output_dir, name): | |
| """Initialize the game, including setting up the game board, player, and environment.""" | |
| self.output_dir = output_dir | |
| self.last_frame_base64 = None | |
| self.last_frame_path = None | |
| self.clock = pygame.time.Clock() | |
| # Load the level config, should be in Dict format | |
| self.level_config = config.level_config | |
| self.set_level_config(self.level_config) | |
| # pygame.mixer.pre_init(44100, -16, 2, 4096) | |
| pygame.init() | |
| self.over = False | |
| self.score = 0 | |
| self.new_action_event = threading.Event() # 用于通知有新动作 | |
| self.new_action_event.set() | |
| self.history_frame_base64 = None | |
| self.history_action = None | |
| self.history_action_info = None | |
| self.history_valid_action = None | |
| self.action_in_sample_frames = None | |
| self.invalid_action_count = 0 | |
| self.game_frames = [] | |
| self.sample_frames = 3 | |
| pygame.display.set_caption(name) | |
| def step(self, action, dt=None): | |
| """ | |
| Perform one step in the game based on the action taken. | |
| :param action: Action to be performed (could be direction, button press, etc.) | |
| :return: (done: bool, info: Any) Whether the game is over and any additional info, such as score. | |
| """ | |
| pass | |
| def human_mode_action(self, event): | |
| """ | |
| Get the action from human player. | |
| :param event: pygame event | |
| :return: action: str | |
| """ | |
| pass | |
| def get_score(self): | |
| """ | |
| Retrieve the current score. | |
| Returns: | |
| int: The current score. | |
| """ | |
| return self.score | |
| def get_game_info(self): | |
| #TODO: history should be managed by agent, not game | |
| info = { | |
| # Last step info | |
| "history_action": self.history_action, | |
| "history_frame_base64": self.history_frame_base64, | |
| "history_action_info": self.history_action_info, | |
| # Current step info | |
| "last_frame_base64": self.last_frame_base64, | |
| "last_frame_path": self.last_frame_path, | |
| } | |
| return info | |
| def init_run(self): | |
| self.step("None", 0) | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| self.history_frame_base64 = self.last_frame_base64 | |
| self.history_action = "None" | |
| self.history_action_info = "Game Initialization." | |
| self.history_valid_action = "None" | |
| def run(self, input_thread_method, human_mode=False): | |
| print("--------------------------------RUNNING---------------------------------------------") | |
| thread_event = threading.Event() | |
| if not human_mode: | |
| input_thread = threading.Thread(target=input_thread_method, args=(thread_event,), daemon=True) | |
| input_thread.start() | |
| self.init_run() | |
| waiting_time = 0 | |
| invalid_action_count_temp = 0 | |
| self.new_action_event.clear() # 清除事件,准备接收下一个动作 | |
| frame = 0 | |
| frame_flag = True | |
| print("self.sample_frames", self.sample_frames) | |
| while not self.over: | |
| # dt = 0.033 | |
| dt = 0.01 | |
| self.clock.tick(config.FPS) | |
| if not frame_flag: | |
| frame += 1 | |
| if frame != self.sample_frames: | |
| step_action = self.history_valid_action | |
| if self.action_in_sample_frames is not None: | |
| step_action = self.action_in_sample_frames | |
| done, info = self.step(step_action, dt) | |
| logging.info(info) | |
| print(info) | |
| print(f"frames {frame} step {step_action}") | |
| if done: | |
| self.over = True | |
| print(info) | |
| break | |
| self.history_action_info = info | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| continue | |
| else: | |
| print(f"frames {frame} == {self.sample_frames}") | |
| frame = 0 | |
| if not human_mode: | |
| self.new_action_event.clear() # 清除事件,准备接收下一个动作 | |
| frame_flag = True | |
| if not human_mode: | |
| if not input_thread.is_alive(): | |
| print("Restarting input thread...") | |
| thread_event = threading.Event() | |
| input_thread = threading.Thread(target=input_thread_method, args=(thread_event,), daemon=True) | |
| time.sleep(5) | |
| input_thread.start() | |
| # human_mode | |
| # if not self.new_action_event.is_set(): | |
| action = None | |
| for event in pygame.event.get(): | |
| action = self.human_mode_action(event) | |
| # print(self) | |
| if action is None or action not in self.valid_actions: | |
| continue | |
| print("pygame base Action: ", action) | |
| done, info = self.step(action, dt) | |
| frame_flag = False | |
| # History | |
| self.history_frame_base64 = self.last_frame_base64 | |
| self.history_action = action | |
| self.history_valid_action = action | |
| self.history_action_info = info | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| if done: | |
| run = False | |
| else: | |
| print(self) | |
| waiting_time = 0 | |
| # 从队列中获取动作 | |
| if self.new_action_event.is_set(): | |
| waiting_time = 0 | |
| action = self.current_action | |
| self.current_action = None | |
| if action in self.valid_actions: | |
| # Valid action | |
| done, info = self.step(action, dt) | |
| logging.info(info) | |
| logging.info("---------------------------NEXT STEP--------------------------") | |
| print(info) | |
| print("---------------------------NEXT STEP--------------------------") | |
| # History | |
| self.history_frame_base64 = self.last_frame_base64 | |
| self.history_action = action | |
| self.history_action_info = info | |
| self.history_valid_action = action | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| invalid_action_count_temp = 0 | |
| frame_flag = False | |
| if done: | |
| self.over = True | |
| else: | |
| info = "Invalid action, which should be one of " + str(self.valid_actions) | |
| print("action:", action,", info:", info) | |
| logging.error(info) | |
| invalid_action_count_temp += 1 | |
| self.invalid_action_count += 1 | |
| if invalid_action_count_temp == 3: | |
| print("Invalid action for 3 times, take a random action.") | |
| logging.error("Invalid action for 3 times, take a random action.") | |
| action = random.choices(self.valid_actions, k=1)[0] | |
| invalid_action_count_temp = 0 | |
| done, info = self.step(action, dt) | |
| # History | |
| self.history_frame_base64 = self.last_frame_base64 | |
| self.history_action = action | |
| self.history_action_info = info | |
| self.history_valid_action = action | |
| frame_flag = False | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| if done: | |
| self.over = True | |
| # action = input("Please enter your action (left, right, up, down or quit): ").strip().lower() | |
| else: | |
| # History | |
| self.history_action = action | |
| self.history_action_info = info | |
| self.history_frame_base64 = self.last_frame_base64 | |
| filepath, encoded_string = capture(self.screen, self.output_dir) | |
| self.last_frame_path=filepath | |
| self.last_frame_base64 = encoded_string | |
| self.game_frames.append(filepath) | |
| self.new_action_event.clear() # 清除事件,准备接收下一个动作 | |
| else: | |
| waiting_time += 1 | |
| if waiting_time % 120 == 0: | |
| print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())), "waiting for action.") | |
| if waiting_time > 2000 * 30: | |
| print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())), "Waiting for action for 200s, restart.") | |
| # kill input thread | |
| thread_event.set() | |
| input_thread.join() | |
| waiting_time = 0 | |
| thread_event = threading.Event() | |
| input_thread = threading.Thread(target=input_thread_method, args=(thread_event,), daemon=True) | |
| input_thread.start() | |
| if not human_mode and input_thread.is_alive(): | |
| thread_event.set() # 通知输入线程退出 | |
| input_thread.join() # 等待输入线程退出 |