|
|
import os |
|
|
import re |
|
|
import pickle |
|
|
from typing import List, Optional, Tuple |
|
|
from transformers import PreTrainedTokenizer |
|
|
from tokenizers import Tokenizer |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
try: |
|
|
import ahocorasick |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"This tokenizer requires the 'pyahocorasick' library. " |
|
|
"Please install it using: pip install pyahocorasick" |
|
|
) |
|
|
|
|
|
class MiLLiTokenizer(PreTrainedTokenizer): |
|
|
"""miLLi 1.0: Azerbaijani Hybrid Tokenizer with Phonological Restoration""" |
|
|
|
|
|
model_input_names = ["input_ids", "attention_mask"] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
repo_id="elshadrahimov/miLLi-1.0", |
|
|
trie_file="milli_trie_v1.pkl", |
|
|
bpe_file="milli_bpe_v1.json", |
|
|
unk_token="<UNK>", |
|
|
pad_token="<PAD>", |
|
|
eos_token="<EOS>", |
|
|
**kwargs |
|
|
): |
|
|
local_dir = os.path.dirname(__file__) |
|
|
local_trie_path = os.path.join(local_dir, trie_file) |
|
|
local_bpe_path = os.path.join(local_dir, bpe_file) |
|
|
|
|
|
if os.path.exists(local_trie_path): |
|
|
self.trie_path = local_trie_path |
|
|
else: |
|
|
try: |
|
|
self.trie_path = hf_hub_download(repo_id=repo_id, filename=trie_file) |
|
|
except Exception as e: |
|
|
raise FileNotFoundError(f"Could not download Trie file from Hub: {e}") |
|
|
|
|
|
if os.path.exists(local_bpe_path): |
|
|
self.bpe_path = local_bpe_path |
|
|
else: |
|
|
try: |
|
|
self.bpe_path = hf_hub_download(repo_id=repo_id, filename=bpe_file) |
|
|
except Exception as e: |
|
|
raise FileNotFoundError(f"Could not download BPE file from Hub: {e}") |
|
|
|
|
|
with open(self.trie_path, "rb") as f: |
|
|
self.trie = pickle.load(f) |
|
|
|
|
|
self.bpe_backend = Tokenizer.from_file(self.bpe_path) |
|
|
|
|
|
super().__init__( |
|
|
unk_token=unk_token, |
|
|
pad_token=pad_token, |
|
|
eos_token=eos_token, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
@property |
|
|
def vocab_size(self) -> int: |
|
|
return self.bpe_backend.get_vocab_size() |
|
|
|
|
|
def get_vocab(self): |
|
|
return self.bpe_backend.get_vocab() |
|
|
|
|
|
def _normalize_phonology(self, sub_word: str) -> Optional[str]: |
|
|
if "_" + sub_word in self.trie: |
|
|
return sub_word |
|
|
|
|
|
replacements = {'ğ': 'q', 'y': 'k'} |
|
|
if sub_word and sub_word[-1] in replacements: |
|
|
restored = sub_word[:-1] + replacements[sub_word[-1]] |
|
|
if "_" + restored in self.trie: |
|
|
return restored |
|
|
|
|
|
vowels = ['ı', 'i', 'u', 'ü'] |
|
|
if len(sub_word) >= 2: |
|
|
for v in vowels: |
|
|
restored = sub_word[:-1] + v + sub_word[-1] |
|
|
if "_" + restored in self.trie: |
|
|
return restored |
|
|
return None |
|
|
|
|
|
def _milli_pre_tokenize(self, text: str) -> List[str]: |
|
|
raw_tokens = re.findall(r'\w+|[^\w\s]', text, re.UNICODE) |
|
|
processed_parts = [] |
|
|
for token in raw_tokens: |
|
|
if not re.match(r'\w+', token): |
|
|
processed_parts.append(token); continue |
|
|
|
|
|
is_upper = token[0].isupper() |
|
|
word = token.lower() |
|
|
if is_upper: processed_parts.append("<UPPER>") |
|
|
|
|
|
found_root, best_root_len = None, 0 |
|
|
for i in range(len(word), 0, -1): |
|
|
root = self._normalize_phonology(word[:i]) |
|
|
if root: |
|
|
found_root = root; best_root_len = i; break |
|
|
|
|
|
if found_root: |
|
|
processed_parts.append("_" + found_root) |
|
|
remainder = word[best_root_len:] |
|
|
if remainder: processed_parts.append("##" + remainder) |
|
|
else: |
|
|
processed_parts.append("_" + word) |
|
|
return processed_parts |
|
|
|
|
|
def _tokenize(self, text: str) -> List[str]: |
|
|
pre_parts = self._milli_pre_tokenize(text) |
|
|
final_tokens = [] |
|
|
for part in pre_parts: |
|
|
clean_part = part.replace('_', '').replace('##', '') |
|
|
if not re.match(r'\w+', clean_part): |
|
|
final_tokens.append(part); continue |
|
|
encoding = self.bpe_backend.encode(part) |
|
|
final_tokens.extend(encoding.tokens) |
|
|
return final_tokens |
|
|
|
|
|
def _convert_token_to_id(self, token: str) -> int: |
|
|
return self.bpe_backend.token_to_id(token) or self.bpe_backend.token_to_id(self.unk_token) |
|
|
|
|
|
def _convert_id_to_token(self, index: int) -> str: |
|
|
return self.bpe_backend.id_to_token(index) |
|
|
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]: |
|
|
if not os.path.isdir(save_directory): |
|
|
os.makedirs(save_directory) |
|
|
vocab_file = os.path.join(save_directory, (filename_prefix or "") + "milli_bpe_v1.json") |
|
|
self.bpe_backend.save(vocab_file) |
|
|
return (vocab_file,) |