Spaces:
Runtime error
Runtime error
| import re | |
| import string | |
| SYMBOLS_MAPPING = { | |
| "\n": "", | |
| "\t": "", | |
| "…": ",", | |
| "“": "'", | |
| "”": "'", | |
| "‘": "'", | |
| "’": "'", | |
| "【": "", | |
| "】": "", | |
| "[": "", | |
| "]": "", | |
| "(": "", | |
| ")": "", | |
| "(": "", | |
| ")": "", | |
| "・": "", | |
| "·": "", | |
| "「": "'", | |
| "」": "'", | |
| "《": "'", | |
| "》": "'", | |
| "—": "", | |
| "~": ",", | |
| "~": ",", | |
| ":": ",", | |
| ";": ",", | |
| ";": ",", | |
| ":": ",", | |
| '"': "", | |
| "!": ",", | |
| # "!": ".", | |
| "————": "", | |
| "——": "", | |
| "—": "", | |
| "……": ",", | |
| "*": "", | |
| } | |
| REPLACE_SYMBOL_REGEX = re.compile( | |
| "|".join(re.escape(p) for p in SYMBOLS_MAPPING.keys()) | |
| ) | |
| EMOJI_REGEX = re.compile( | |
| "[" | |
| "\U0001f600-\U0001f64f" # emoticons | |
| "\U0001f300-\U0001f5ff" # symbols & pictographs | |
| "\U0001f680-\U0001f6ff" # transport & map symbols | |
| "\U0001f1e0-\U0001f1ff" # flags (iOS) | |
| "]+", | |
| flags=re.UNICODE, | |
| ) | |
| def clean_text(text): | |
| # Clean the text | |
| text = text.strip() | |
| text = text.replace("\xa0", "") | |
| # Replace all chinese symbols with their english counterparts | |
| text = REPLACE_SYMBOL_REGEX.sub(lambda x: SYMBOLS_MAPPING[x.group()], text) | |
| # Remove emojis | |
| text = EMOJI_REGEX.sub(r"", text) | |
| # Remove continuous periods (...) and commas (,,,) | |
| text = re.sub(r"[.,]{2,}", lambda m: m.group()[0], text) | |
| return text | |
| def utf_8_len(text): | |
| return len(text.encode("utf-8")) | |
| def break_text(texts, length, splits: set): | |
| for text in texts: | |
| if utf_8_len(text) <= length: | |
| yield text | |
| continue | |
| curr = "" | |
| for char in text: | |
| curr += char | |
| if char in splits: | |
| yield curr | |
| curr = "" | |
| if curr: | |
| yield curr | |
| def break_text_by_length(texts, length): | |
| for text in texts: | |
| if utf_8_len(text) <= length: | |
| yield text | |
| continue | |
| curr = "" | |
| for char in text: | |
| curr += char | |
| if utf_8_len(curr) >= length: | |
| yield curr | |
| curr = "" | |
| if curr: | |
| yield curr | |
| def add_cleaned(curr, segments): | |
| curr = curr.strip() | |
| if curr and not all(c.isspace() or c in string.punctuation for c in curr): | |
| segments.append(curr) | |
| def protect_float(text): | |
| # Turns 3.14 into <3_f_14> to prevent splitting | |
| return re.sub(r"(\d+)\.(\d+)", r"<\1_f_\2>", text) | |
| def unprotect_float(text): | |
| # Turns <3_f_14> into 3.14 | |
| return re.sub(r"<(\d+)_f_(\d+)>", r"\1.\2", text) | |
| def split_text(text, length): | |
| text = clean_text(text) | |
| # Break the text into pieces with following rules: | |
| # 1. Split the text at ".", "!", "?" if text is NOT a float | |
| # 2. If the text is longer than length, split at "," | |
| # 3. If the text is still longer than length, split at " " | |
| # 4. If the text is still longer than length, split at any character to length | |
| texts = [text] | |
| texts = map(protect_float, texts) | |
| texts = break_text(texts, length, {".", "!", "?", "。", "!", "?"}) | |
| texts = map(unprotect_float, texts) | |
| texts = break_text(texts, length, {",", ","}) | |
| texts = break_text(texts, length, {" "}) | |
| texts = list(break_text_by_length(texts, length)) | |
| # Then, merge the texts into segments with length <= length | |
| segments = [] | |
| curr = "" | |
| for text in texts: | |
| if utf_8_len(curr) + utf_8_len(text) <= length: | |
| curr += text | |
| else: | |
| add_cleaned(curr, segments) | |
| curr = text | |
| if curr: | |
| add_cleaned(curr, segments) | |
| return segments | |
| def contains_chinese(text): | |
| """检测文本是否包含中文字符""" | |
| return bool(re.search(r"[\u4e00-\u9fff]", text)) | |
| def count_words_english(text): | |
| """统计英文单词数量""" | |
| return len(text.split()) | |
| def count_characters_chinese(text): | |
| """统计中文字符数量""" | |
| return len(text) | |
| def split_by_punctuation_english(text): | |
| """按英文标点符号分割""" | |
| sentences = re.split(r"([.!?])", text) | |
| result = [] | |
| for i in range(0, len(sentences) - 1, 2): | |
| sentence = sentences[i].strip() | |
| if sentence: | |
| if i + 1 < len(sentences): | |
| sentence += sentences[i + 1] | |
| result.append(sentence) | |
| if len(sentences) % 2 == 1 and sentences[-1].strip(): | |
| result.append(sentences[-1].strip()) | |
| return result | |
| def split_by_punctuation_chinese(text): | |
| """按中文标点符号分割""" | |
| sentences = re.split(r"([。!?])", text) | |
| result = [] | |
| for i in range(0, len(sentences) - 1, 2): | |
| sentence = sentences[i].strip() | |
| if sentence: | |
| if i + 1 < len(sentences): | |
| sentence += sentences[i + 1] | |
| result.append(sentence) | |
| if len(sentences) % 2 == 1 and sentences[-1].strip(): | |
| result.append(sentences[-1].strip()) | |
| return result | |
| def merge_sentences_english(sentences, max_words=80): | |
| """合并英文句子""" | |
| result = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if not current_chunk: | |
| current_chunk = sentence | |
| else: | |
| test_chunk = current_chunk + " " + sentence | |
| if count_words_english(test_chunk) <= max_words: | |
| current_chunk = test_chunk | |
| else: | |
| result.append(current_chunk) | |
| current_chunk = sentence | |
| if current_chunk: | |
| result.append(current_chunk) | |
| return result | |
| def merge_sentences_chinese(sentences, max_chars=100): | |
| """合并中文句子""" | |
| result = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if not current_chunk: | |
| current_chunk = sentence | |
| else: | |
| test_chunk = current_chunk + sentence | |
| if count_characters_chinese(test_chunk) <= max_chars: | |
| current_chunk = test_chunk | |
| else: | |
| result.append(current_chunk) | |
| current_chunk = sentence | |
| if current_chunk: | |
| result.append(current_chunk) | |
| return result | |
| def process_text(text): | |
| chinese_max_limit = 150 | |
| english_max_limit = 80 | |
| # 移除开头的标记如[S2] | |
| text = re.sub(r"^\[S\d+\]", "", text).strip() | |
| is_chinese = contains_chinese(text) | |
| if is_chinese: | |
| if count_characters_chinese(text) <= chinese_max_limit: | |
| return [text] | |
| sentences = split_by_punctuation_chinese(text) | |
| result = merge_sentences_chinese(sentences, chinese_max_limit) | |
| else: | |
| if count_words_english(text) <= english_max_limit: | |
| return [text] | |
| sentences = split_by_punctuation_english(text) | |
| result = merge_sentences_english(sentences, english_max_limit) | |
| return result | |
| def process_text_list(text_list): | |
| new_text_list = [] | |
| for text in text_list: | |
| speaker = text[:4] | |
| # print("---speaker:", speaker) | |
| assert speaker in ["[S1]", "[S2]", "[S3]", "[S4]"] | |
| result = process_text(text=text) | |
| # print("---result:\n", result, len(result)) | |
| for chunk in result: | |
| new_text_list.append(speaker + chunk) | |
| return new_text_list | |