| |
|
| |
|
| |
|
| | import json
|
| | import os
|
| | import itertools
|
| | from pathlib import Path
|
| | from datasets import load_dataset
|
| | from transformers import AutoTokenizer
|
| | import langdetect
|
| | from tqdm import tqdm
|
| | import argparse
|
| |
|
| |
|
| | class DataPreprocessor:
|
| | def __init__(self, output_dir="data", seq_length=1024):
|
| | self.output_dir = Path(output_dir)
|
| | self.seq_length = seq_length
|
| | self.setup_directories()
|
| |
|
| | def setup_directories(self):
|
| | """Create necessary directories"""
|
| | dirs = ["shards", "processed", "tokens"]
|
| | for d in dirs:
|
| | (self.output_dir / d).mkdir(parents=True, exist_ok=True)
|
| |
|
| | def download_refinedweb_sample(self, num_docs=100000):
|
| | """Download a sample from RefinedWeb dataset"""
|
| | print(f"Downloading {num_docs} documents from RefinedWeb...")
|
| |
|
| | raw_path = self.output_dir / "shards" / "refinedweb_sample_raw.jsonl"
|
| |
|
| | try:
|
| |
|
| | ds = load_dataset("tiiuae/falcon-refinedweb", split="train", streaming=True)
|
| |
|
| | downloaded = 0
|
| | with open(raw_path, "w", encoding="utf-8") as f:
|
| | for row in tqdm(itertools.islice(ds, num_docs), total=num_docs):
|
| |
|
| | text = row.get("content", "").strip()
|
| | if text and len(text) > 100:
|
| | f.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
|
| | downloaded += 1
|
| |
|
| | if downloaded >= num_docs:
|
| | break
|
| |
|
| | print(f"Raw RefinedWeb data saved to: {raw_path}")
|
| | print(f"Downloaded {downloaded} high-quality documents")
|
| | return raw_path
|
| |
|
| | except Exception as e:
|
| | print(f"Error downloading RefinedWeb: {e}")
|
| | print("Falling back to Matrix dataset...")
|
| | return self.download_matrix_sample_fallback(num_docs)
|
| |
|
| | def download_matrix_sample_fallback(self, num_docs=10000):
|
| | """Download a sample from MAP-NEO Matrix dataset"""
|
| | print(f"Downloading {num_docs} documents from Matrix dataset...")
|
| |
|
| | raw_path = self.output_dir / "shards" / "matrix_sample_raw.jsonl"
|
| |
|
| | ds = load_dataset("m-a-p/Matrix", split="train", streaming=True)
|
| |
|
| | with open(raw_path, "w", encoding="utf-8") as f:
|
| | for i, row in enumerate(tqdm(itertools.islice(ds, num_docs), total=num_docs)):
|
| | text = row.get("text") or row.get("content") or ""
|
| | if text.strip():
|
| | f.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
|
| |
|
| | print(f"Raw data saved to: {raw_path}")
|
| | return raw_path
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def filter_refinedweb_quality(self, input_path):
|
| | """Enhanced quality filtering for RefinedWeb data"""
|
| | print("Applying enhanced quality filtering for RefinedWeb...")
|
| |
|
| | input_path = Path(input_path)
|
| | output_path = self.output_dir / "processed" / "refinedweb_filtered.jsonl"
|
| |
|
| | filtered_count = 0
|
| | total_count = 0
|
| |
|
| | with open(input_path, "r", encoding="utf-8") as infile, \
|
| | open(output_path, "w", encoding="utf-8") as outfile:
|
| |
|
| | for line in tqdm(infile, desc="Quality filtering"):
|
| | total_count += 1
|
| | try:
|
| | obj = json.loads(line)
|
| | text = obj["text"]
|
| |
|
| |
|
| | if self.is_high_quality_web_text(text):
|
| | outfile.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
| | filtered_count += 1
|
| |
|
| | except Exception:
|
| | continue
|
| |
|
| | print(f"Filtered {filtered_count}/{total_count} documents for quality")
|
| | print(f"Filtered data saved to: {output_path}")
|
| | return output_path
|
| |
|
| | def is_high_quality_web_text(self, text):
|
| | """Check if web text meets quality standards"""
|
| |
|
| | if len(text) < 200 or len(text) > 10000:
|
| | return False
|
| |
|
| |
|
| | try:
|
| | if langdetect.detect(text) != "en":
|
| | return False
|
| | except:
|
| | return False
|
| |
|
| |
|
| | words = text.split()
|
| | if len(words) < 50:
|
| | return False
|
| |
|
| |
|
| | spam_indicators = ['click here', 'buy now', 'free download', '###', '***']
|
| | text_lower = text.lower()
|
| | spam_count = sum(1 for indicator in spam_indicators if indicator in text_lower)
|
| | if spam_count > 2:
|
| | return False
|
| |
|
| |
|
| | sentences = text.split('.')
|
| | if len(sentences) < 3:
|
| | return False
|
| |
|
| | return True
|
| |
|
| |
|
| | def tokenize_and_pack(self, input_path, tokenizer_name="gpt2"):
|
| | """Tokenize documents and pack into fixed-length sequences"""
|
| | print(f"Tokenizing with {tokenizer_name} and packing to {self.seq_length} tokens...")
|
| |
|
| |
|
| | tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
|
| | if tokenizer.pad_token is None:
|
| | tokenizer.pad_token = tokenizer.eos_token
|
| |
|
| | input_path = Path(input_path)
|
| | output_path = self.output_dir / "tokens" / f"packed_{self.seq_length}.txt"
|
| |
|
| | buffer = []
|
| | sequences_written = 0
|
| | total_tokens = 0
|
| |
|
| | with open(input_path, "r", encoding="utf-8") as infile, \
|
| | open(output_path, "w", encoding="utf-8") as outfile:
|
| |
|
| | for line in tqdm(infile, desc="Processing documents"):
|
| | try:
|
| | text = json.loads(line)["text"]
|
| |
|
| |
|
| | tokens = tokenizer.encode(text, add_special_tokens=False)
|
| |
|
| |
|
| | buffer.extend(tokens + [tokenizer.eos_token_id])
|
| | total_tokens += len(tokens) + 1
|
| |
|
| |
|
| | while len(buffer) >= self.seq_length:
|
| | sequence = buffer[:self.seq_length]
|
| | buffer = buffer[self.seq_length:]
|
| |
|
| |
|
| | outfile.write(" ".join(map(str, sequence)) + "\n")
|
| | sequences_written += 1
|
| |
|
| | except Exception as e:
|
| | continue
|
| |
|
| | print(f"Created {sequences_written} sequences of {self.seq_length} tokens each")
|
| | print(f"Total tokens processed: {total_tokens:,}")
|
| | print(f"Packed data saved to: {output_path}")
|
| |
|
| |
|
| | tokenizer_path = self.output_dir / "tokenizer"
|
| | tokenizer.save_pretrained(tokenizer_path)
|
| | print(f"Tokenizer saved to: {tokenizer_path}")
|
| |
|
| | return output_path, tokenizer_path
|
| |
|
| |
|
| | def main():
|
| | parser = argparse.ArgumentParser(description="Preprocess MAP-NEO training data")
|
| | parser.add_argument("--num_docs", type=int, default=10000,
|
| | help="Number of documents to download")
|
| | parser.add_argument("--seq_length", type=int, default=1024,
|
| | help="Sequence length for packing")
|
| | parser.add_argument("--tokenizer", type=str, default="gpt2",
|
| | help="Tokenizer to use")
|
| | parser.add_argument("--output_dir", type=str, default="data",
|
| | help="Output directory")
|
| |
|
| | args = parser.parse_args()
|
| |
|
| |
|
| | preprocessor = DataPreprocessor(args.output_dir, args.seq_length)
|
| |
|
| |
|
| | print("Starting MAP-NEO data preprocessing pipeline...")
|
| |
|
| |
|
| | raw_path = preprocessor.download_refinedweb_sample(args.num_docs)
|
| |
|
| |
|
| | filtered_path = preprocessor.filter_refinedweb_quality(raw_path)
|
| |
|
| |
|
| | packed_path, tokenizer_path = preprocessor.tokenize_and_pack(
|
| | filtered_path, args.tokenizer
|
| | )
|
| |
|
| | print("\n" + "="*50)
|
| | print("Data preprocessing complete!")
|
| | print(f"Packed sequences: {packed_path}")
|
| | print(f"Tokenizer: {tokenizer_path}")
|
| | print("="*50)
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | main() |