Spaces:
Build error
Build error
| import duckdb | |
| import yaml | |
| import time | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Create a DuckDB connection | |
| con = duckdb.connect() | |
| def execute_with_timing(query, description): | |
| """Execute a DuckDB query and log the execution time.""" | |
| start_time = time.perf_counter() # Use perf_counter for higher resolution timing | |
| con.execute(query) | |
| end_time = time.perf_counter() # End timing after the query execution | |
| logging.info(f"Completed {description} in {end_time - start_time:.6f} seconds.") | |
| # Start timing the total execution | |
| total_start_time = time.perf_counter() | |
| # Load parents.parquet into an in-memory table | |
| load_parents_query = """ | |
| CREATE TABLE parents_in_memory AS | |
| SELECT * FROM parquet_scan('public/parents.parquet') | |
| """ | |
| execute_with_timing(load_parents_query, "Loaded parents.parquet into RAM") | |
| # Step 1: Assign a unique numerical ID to each model ID | |
| unique_id_query = """ | |
| CREATE TABLE unique_ids AS | |
| SELECT | |
| id, | |
| ROW_NUMBER() OVER () AS tmp_id | |
| FROM parents_in_memory | |
| """ | |
| execute_with_timing(unique_id_query, "Step 1: Created unique_ids table") | |
| # Step 2: Unnest base_models and create a temporary table | |
| unnest_query = """ | |
| CREATE TABLE unnested_models AS | |
| SELECT | |
| u.tmp_id AS child_tmp_id, -- Numerical ID for the child model | |
| UNNEST(p.base_models) AS base_model | |
| FROM parents_in_memory p | |
| JOIN unique_ids u ON p.id = u.id | |
| WHERE p.base_models IS NOT NULL -- Filter out models without base models | |
| """ | |
| execute_with_timing(unnest_query, "Step 2: Created unnested_models table") | |
| # Step 3: Create a temporary table for direct parent mapping using numerical IDs | |
| parent_level_query = """ | |
| CREATE TABLE parent_level AS | |
| SELECT | |
| u.child_tmp_id, -- Numerical ID for the child model | |
| b.tmp_id AS base_tmp_id -- Numerical ID for the base model (parent) | |
| FROM unnested_models u | |
| JOIN unique_ids b ON u.base_model = b.id | |
| """ | |
| execute_with_timing(parent_level_query, "Step 3: Created parent_level table") | |
| # Step 4: Recursive CTE to find all ancestor-children mappings using numerical IDs | |
| ancestor_children_query = """ | |
| CREATE TABLE ancestor_children AS | |
| WITH RECURSIVE ancestor_children_cte AS ( | |
| SELECT | |
| base_tmp_id AS ancestor_tmp_id, -- Start with direct parent as ancestor | |
| child_tmp_id AS child_tmp_id, -- Direct child | |
| 1 AS depth -- Initialize depth counter | |
| FROM parent_level | |
| UNION ALL | |
| SELECT | |
| ac.ancestor_tmp_id, -- Propagate ancestor | |
| pl.child_tmp_id, -- Find new child in the chain | |
| ac.depth + 1 -- Increment depth counter | |
| FROM parent_level pl | |
| JOIN ancestor_children_cte ac ON pl.base_tmp_id = ac.child_tmp_id | |
| WHERE ac.depth < 20 -- Limit recursion to 10 levels | |
| ) | |
| SELECT | |
| a.id AS ancestor, | |
| LIST(DISTINCT c.id) AS all_children | |
| FROM ancestor_children_cte ac | |
| JOIN unique_ids a ON ac.ancestor_tmp_id = a.tmp_id | |
| JOIN unique_ids c ON c.tmp_id = ac.child_tmp_id | |
| GROUP BY ancestor | |
| """ | |
| execute_with_timing(ancestor_children_query, "Step 4: Created ancestor_children table with string IDs") | |
| # Create a direct children mapping table | |
| direct_children_mapping_query = """ | |
| CREATE TABLE direct_children_mapping AS | |
| SELECT | |
| p.id AS parent, | |
| LIST(DISTINCT u.id) AS direct_children | |
| FROM parents_in_memory p | |
| LEFT JOIN unnested_models um ON p.id = um.base_model | |
| LEFT JOIN unique_ids u ON um.child_tmp_id = u.tmp_id | |
| GROUP BY p.id | |
| """ | |
| execute_with_timing(direct_children_mapping_query, "Created direct_children_mapping table") | |
| # Write the final result to a parquet file, using direct_children_mapping for direct_children | |
| start_time = time.perf_counter() | |
| final_output_query = """ | |
| COPY ( | |
| SELECT | |
| ac.ancestor as ancestor, | |
| dcm.direct_children as direct_children, | |
| ac.all_children as all_children, | |
| CAST(array_length(ac.all_children) AS INTEGER) as all_children_count, | |
| CAST(array_length(dcm.direct_children) AS INTEGER) as direct_children_count | |
| FROM ancestor_children ac | |
| LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent | |
| ORDER BY all_children_count DESC | |
| ) TO 'public/ancestor_children.parquet' (FORMAT 'parquet') | |
| """ | |
| con.execute(final_output_query) | |
| end_time = time.perf_counter() | |
| logging.info(f"Written ancestor_children to parquet file in {end_time - start_time:.6f} seconds.") | |
| # Write a random sample of 10 rows with non-empty children to yaml file for inspection | |
| start_time = time.perf_counter() | |
| sample_query = """ | |
| SELECT ac.ancestor, dcm.direct_children, ac.all_children | |
| FROM ancestor_children ac | |
| LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent | |
| WHERE array_length(ac.all_children) > 0 | |
| LIMIT 10 | |
| """ | |
| sample_data = con.execute(sample_query).fetchall() | |
| with open("public/ancestor_children.example.yaml", "w") as f: | |
| yaml.safe_dump(sample_data, f, default_flow_style=False) | |
| end_time = time.perf_counter() | |
| logging.info(f"Written sample data to YAML file in {end_time - start_time:.6f} seconds.") | |
| # Write a random sample of 10 rows with no children (direct or indirect) to yaml file | |
| start_time = time.perf_counter() | |
| no_children_query = """ | |
| SELECT ac.ancestor, dcm.direct_children, ac.all_children | |
| FROM ancestor_children ac | |
| LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent | |
| WHERE array_length(ac.all_children) = 0 | |
| LIMIT 10 | |
| """ | |
| no_children_data = con.execute(no_children_query).fetchall() | |
| end_time = time.perf_counter() | |
| logging.info(f"Fetched sample data of models with no children in {end_time - start_time:.6f} seconds.") | |
| logging.info("Examples of models with no children (direct or indirect):") | |
| for model in no_children_data: | |
| logging.info(model) | |
| # List top 10 ancestors with the most children and their number of direct children | |
| start_time = time.perf_counter() | |
| top_ancestors_query = """ | |
| SELECT | |
| ac.ancestor, | |
| array_length(ac.all_children) AS num_all_children, | |
| array_length(dcm.direct_children) AS num_direct_children | |
| FROM ancestor_children ac | |
| LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent | |
| ORDER BY num_all_children DESC | |
| LIMIT 10 | |
| """ | |
| top_ancestors = con.execute(top_ancestors_query).fetchall() | |
| end_time = time.perf_counter() | |
| logging.info(f"Listed top 10 ancestors with the most children in {end_time - start_time:.6f} seconds.") | |
| logging.info("Top 10 ancestors with the most children and their number of direct children:") | |
| for ancestor in top_ancestors: | |
| logging.info(ancestor) | |
| # Log the total processing time | |
| total_execution_time = time.perf_counter() - total_start_time | |
| print(f"Total processing time: {total_execution_time:.6f} seconds") | |