Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from typing import Optional, Tuple, Dict | |
| from collections import OrderedDict | |
| from collections.abc import Mapping, Iterable | |
| from datetime import datetime | |
| # import torch | |
| # from cradle import constants | |
| from utils.string_utils import contains_punctuation, is_numbered_bullet_list_item | |
| def load_json(file_path): | |
| with open(file_path, mode='r', encoding='utf8') as fp: | |
| json_dict = json.load(fp) | |
| return json_dict | |
| # def serialize_data(item): | |
| # """Recursively convert non-serializable items in the dictionary.""" | |
| # if isinstance(item, (str, int, float, bool)): | |
| # return item | |
| # elif isinstance(item, torch.Tensor): | |
| # # Check if the tensor is 0-d (a scalar) | |
| # if item.dim() == 0: | |
| # # Convert scalar tensor to a Python number | |
| # return item.item() | |
| # else: | |
| # # Check if tensor is on a GPU, move to CPU first | |
| # if item.is_cuda: | |
| # item = item.cpu() | |
| # # Convert tensor to a list | |
| # return item.numpy().tolist() | |
| # elif isinstance(item, datetime): | |
| # return item.isoformat() | |
| # if isinstance(item, Mapping): | |
| # return {key: serialize_data(value) for key, value in item.items()} | |
| # elif isinstance(item, Iterable): | |
| # return [serialize_data(element) for element in item] | |
| # elif isinstance(item, JsonFrameStructure): # Assuming JSONStructure needs to be handled | |
| # return item.to_dict() # Assuming JSONStructure objects have a to_dict method or similar | |
| # return item | |
| # def save_json(file_path, json_dict, indent=-1): | |
| # processed_data = serialize_data(json_dict) | |
| # with open(file_path, mode='w', encoding='utf8') as fp: | |
| # if indent == -1: | |
| # json.dump(processed_data, fp, ensure_ascii=False) | |
| # else: | |
| # json.dump(processed_data, fp, ensure_ascii=False, indent=indent) | |
| def check_json(json_string): | |
| try: | |
| json.loads(json_string) | |
| except: | |
| return False | |
| return True | |
| def refine_json(json_string): | |
| patterns = [ | |
| r"^`+json(.*?)`+", # ```json content```, ```json content``, ... | |
| r"^json(.*?)", # json content | |
| r"^json(.*?)\." # json content. | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, json_string, re.DOTALL) | |
| if match: | |
| json_string = match.group(1) | |
| if check_json(json_string): | |
| return json_string | |
| return json_string | |
| def parse_semi_formatted_json(json_string): | |
| obj = None | |
| try: | |
| response = refine_json(json_string) | |
| obj = json.loads(response) | |
| except Exception as e: | |
| raise ValueError(f"Error in processing json: {e}. Object was: {json_string}.") from e | |
| return obj | |
| def _is_line_key_candidate(line: str) -> Tuple[bool, Optional[str]]: | |
| result = False | |
| likely_key = None | |
| if line.endswith(':'): | |
| # Cannot have other previous punctuation, except if it's a numbered bullet list item | |
| num_idx = is_numbered_bullet_list_item(line) | |
| post_num_idx = 0 | |
| if num_idx > -1: | |
| post_num_idx = num_idx | |
| likely_key = line[post_num_idx:-1].strip() | |
| result = not contains_punctuation(likely_key) | |
| return result, likely_key | |
| ### Parses the semi-formatted text from model response | |
| def parse_semi_formatted_text(text): | |
| lines = text.split('\n') | |
| lines = [line.rstrip() for line in lines if line.rstrip()] | |
| result_dict = {} | |
| current_key = None | |
| current_value = [] | |
| parsed_data = [] | |
| in_code_flag = False | |
| for line in lines: | |
| line = line.replace("**", "").replace("###", "").replace("##", "") # Remove unnecessary in Markdown formatting | |
| is_key, key_candidate = _is_line_key_candidate(line) | |
| # Check if the line indicates a new key | |
| if is_key and in_code_flag == False: | |
| # If there's a previous key, process its values | |
| if current_key and current_key == 'action_guidance': | |
| result_dict[current_key] = parsed_data | |
| elif current_key: | |
| result_dict[current_key] = '\n'.join(current_value).strip() | |
| try: | |
| current_key = key_candidate.replace(" ", "_").lower() | |
| except Exception as e: | |
| # logger.error(f"Response is not in the correct format: {e}\nReceived text was: {text}") | |
| raise | |
| current_value = [] | |
| parsed_data = [] | |
| else: | |
| if current_key == 'action_guidance': | |
| in_code_flag = True | |
| if line.strip() == '```': | |
| if current_value: # Process previous code block and description | |
| entry = {"code": '\n'.join(current_value[1:])} | |
| parsed_data.append(entry) | |
| current_value = [] | |
| in_code_flag = False | |
| else: | |
| current_value.append(line) | |
| if line.strip().lower() == 'null': | |
| in_code_flag = False | |
| else: | |
| in_code_flag = False | |
| line = line.strip() | |
| current_value.append(line) | |
| # Process the last key | |
| if current_key == 'action_guidance': | |
| if current_value: # Process the last code block and description | |
| entry = {"code": '\n'.join(current_value[:-1]).strip()} | |
| parsed_data.append(entry) | |
| result_dict[current_key] = parsed_data | |
| else: | |
| result_dict[current_key] = '\n'.join(current_value).strip() | |
| if "success" in result_dict: | |
| result_dict["success"] = result_dict["success"].lower() == "true" | |
| return result_dict | |
| class JsonFrameStructure(): | |
| def __init__(self): | |
| self.data_structure: Dict[int, Dict[str, list[Dict[str, any]]]] = {} | |
| self.end_index: int = -1 | |
| def add_instance(self, timestamp: str, instance: dict[str, any]) -> None: | |
| # Check if the timestamp already exists across all indices | |
| exists = False | |
| for index_data in self.data_structure.values(): | |
| if timestamp in index_data: | |
| # Timestamp already exists, append the instance to the existing timestamp | |
| index_data[timestamp].append(instance) | |
| exists = True | |
| break | |
| if not exists: | |
| # Timestamp is new, create a new entry and increment the end_index | |
| self.end_index += 1 | |
| self.data_structure.setdefault(self.end_index, {}).setdefault(timestamp, []).append(instance) | |
| def sort_index_by_timestamp(self) -> None: | |
| extracted_data = [(key, value) for entry in self.data_structure.values() for key, value in entry.items()] | |
| sorted_data = sorted(extracted_data, key=lambda x: x[0]) | |
| # Reconstructing the JSON structure with sorted data | |
| self.data_structure = OrderedDict({index: {key: value} for index, (key, value) in enumerate(sorted_data)}) | |
| def search_type_across_all_indices(self, search_type: str) -> list[dict[str, any]]: | |
| results = [] | |
| # Sort the keys in ascending order | |
| for index, index_data in sorted(self.data_structure.items()): | |
| for object_id, instances in index_data.items(): | |
| for instance in instances: | |
| for type, values in instance.items(): | |
| if type == search_type and values != "" and values != []: | |
| results.append({"index": index, "object_id": object_id, "values":values}) | |
| return results | |
| def to_dict(self): | |
| return { | |
| "data_structure": self.data_structure, | |
| "end_index": self.end_index | |
| } | |
| def from_dict(cls, data_dict): | |
| instance = cls() | |
| instance.data_structure = data_dict.get("data_structure", {}) | |
| instance.end_index = data_dict.get("end_index", -1) | |
| return instance | |