aursalan commited on
Commit
61cfbd5
Β·
1 Parent(s): 94c36ed

Added changes

Browse files
Files changed (1) hide show
  1. main.py +55 -59
main.py CHANGED
@@ -1,5 +1,5 @@
1
  import psycopg2
2
- from psycopg2.extras import execute_values
3
  import pandas as pd
4
  from sentence_transformers import SentenceTransformer
5
  import os
@@ -10,9 +10,9 @@ from fastapi import FastAPI, BackgroundTasks, HTTPException
10
  from contextlib import asynccontextmanager
11
  from fastapi.responses import HTMLResponse
12
  import threading
 
13
 
14
  # --- Configuration ---
15
- # You can set this via environment variable, or keep the hardcoded string here.
16
  SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")
17
 
18
  # --- Toggles & Tuning ---
@@ -22,15 +22,15 @@ DRY_RUN = False
22
 
23
  # --- Global State ---
24
  model = None
25
- execution_logs = deque(maxlen=50) # Stores the last 50 batch logs in RAM
26
- processing_lock = threading.Lock() # Lock to prevent overlapping pings
27
 
28
- # --- Lifespan Manager (Loads Model on Startup) ---
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  global model
32
  print("⏳ Loading Model...")
33
- # Load model once when the API starts
34
  model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
35
  print("βœ… Model Loaded.")
36
  yield
@@ -42,84 +42,89 @@ app = FastAPI(lifespan=lifespan)
42
 
43
  def fetch_and_lock_chunk(conn, chunk_size):
44
  """
45
- Fetches the next batch of JOBS and LOCKS them
46
- so other workers skip them (Concurrency Safe).
47
  """
48
  query = """
49
  WITH locked_jobs AS (
50
- SELECT job_id, job_title, roles_and_responsibilities, qualification
 
 
 
 
 
 
 
 
 
51
  FROM jobs
52
  WHERE
53
- -- Condition 1: Embedding is missing
54
- job_embeddings IS NULL
55
  OR
56
- -- Condition 2: Job details were updated more recently than the vector
57
- updated_at > job_embeddings_updated_at
 
58
  LIMIT %s
59
- FOR UPDATE SKIP LOCKED -- <--- Prevents conflicts
60
  )
61
- SELECT
62
- lj.job_id,
63
- lj.job_title,
64
- lj.roles_and_responsibilities,
65
- lj.qualification,
66
-
67
- -- 1. Skills (Subquery)
68
- (SELECT json_agg(DISTINCT s.skill_name)
69
- FROM job_skill_map jsm
70
- JOIN skills s ON jsm.skill_id = s.skill_id
71
- WHERE jsm.job_id = lj.job_id) AS skills
72
-
73
- FROM locked_jobs lj;
74
  """
 
75
  return pd.read_sql_query(query, conn, params=(chunk_size,))
76
 
77
  def clean_and_format_text(row):
78
  """
79
- Joins lists into a single string with Semantic Anchors (Tags).
80
  """
81
  # Configuration: Maps DB Column -> Semantic Tag
 
82
  field_config = [
83
- ('job_title', 'Job Title'),
 
 
 
 
84
  ('roles_and_responsibilities', 'Responsibilities'),
85
- ('qualification', 'Qualifications'),
86
- ('skills', 'Required Skills')
87
  ]
88
 
89
  text_parts = []
90
 
91
  for col_name, tag in field_config:
92
- if col_name in row:
93
  data = row[col_name]
94
 
95
- # Case A: List of strings (Skills)
96
- if isinstance(data, list) and len(data) > 0:
97
- clean_items = [str(item).strip() for item in data if item is not None and str(item).strip()]
 
98
  if clean_items:
99
  text_parts.append(f"{tag}: " + ", ".join(clean_items))
100
 
101
- # Case B: Single String (Title, Description)
102
- elif isinstance(data, str) and data.strip():
103
- # Clean up newlines in description to avoid messy formatting
104
- clean_text = data.strip().replace('\r', '')
105
  text_parts.append(f"{tag}: {clean_text}")
106
 
 
107
  return "\n".join(text_parts)
108
 
109
  def update_db_batch(conn, updates):
110
  if DRY_RUN: return
111
 
 
112
  query = """
113
  UPDATE jobs AS j
114
- SET job_embeddings = data.vector::vector,
115
- job_embeddings_updated_at = NOW()
116
  FROM (VALUES %s) AS data (id, vector)
117
- WHERE j.job_id = data.id
118
  """
119
  cursor = conn.cursor()
120
  try:
121
  execute_values(cursor, query, updates)
122
- conn.commit() # Releases locks
123
  except Exception as e:
124
  conn.rollback()
125
  raise e
@@ -130,14 +135,14 @@ def run_worker_logic():
130
  """
131
  The core logic that runs one single batch processing for JOBS.
132
  """
133
- log_buffer = [] # Local buffer to capture logs for this specific run
134
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
135
 
136
  log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
137
 
138
  conn = None
139
  try:
140
- conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require')
141
 
142
  # 1. Fetch & Lock
143
  df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE)
@@ -145,7 +150,6 @@ def run_worker_logic():
145
  if df.empty:
146
  conn.rollback()
147
  log_buffer.append("πŸ’€ No pending jobs found.")
148
- # Add to global logs and exit
149
  execution_logs.appendleft("<br>".join(log_buffer))
150
  return "No data"
151
 
@@ -154,14 +158,15 @@ def run_worker_logic():
154
  # 2. Clean Text
155
  df['full_text'] = df.apply(clean_and_format_text, axis=1)
156
 
157
- # 3. Log Inputs (For the Root API view)
158
  for index, row in df.iterrows():
159
  log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
160
- log_buffer.append(f"<strong>ID: {row['job_id']} ({row.get('job_title', 'Unknown')})</strong>")
161
- log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>")
162
  log_buffer.append("</div>")
163
 
164
  # 4. Generate Embeddings
 
165
  embeddings = model.encode(
166
  df['full_text'].tolist(),
167
  batch_size=EMBEDDING_BATCH_SIZE,
@@ -171,7 +176,7 @@ def run_worker_logic():
171
  )
172
 
173
  # 5. Update DB
174
- updates = list(zip(df['job_id'].tolist(), embeddings.tolist()))
175
 
176
  if not DRY_RUN:
177
  update_db_batch(conn, updates)
@@ -186,16 +191,12 @@ def run_worker_logic():
186
  print(f"Error: {e}")
187
  finally:
188
  if conn: conn.close()
189
- # Push the local buffer to the global execution log
190
  execution_logs.appendleft("<br>".join(log_buffer))
191
 
192
  # --- API Endpoints ---
193
 
194
  @app.get("/", response_class=HTMLResponse)
195
  async def read_root():
196
- """
197
- Root endpoint: Displays the logs of recent processing batches.
198
- """
199
  html_content = """
200
  <html>
201
  <head>
@@ -223,18 +224,13 @@ async def read_root():
223
 
224
  @app.get("/trigger-batch")
225
  async def trigger_processing(background_tasks: BackgroundTasks):
226
- """
227
- External Pinger: Hits this endpoint to trigger one batch of processing.
228
- """
229
  if processing_lock.locked():
230
  return {"status": "busy", "message": "Worker is currently processing a previous batch."}
231
 
232
- # We run the worker in a background task so the API response is fast
233
  background_tasks.add_task(wrapped_worker)
234
  return {"status": "started", "message": "Batch processing started in background."}
235
 
236
  def wrapped_worker():
237
- """Thread-safe wrapper for the worker logic"""
238
  if processing_lock.acquire(blocking=False):
239
  try:
240
  run_worker_logic()
 
1
  import psycopg2
2
+ from psycopg2.extras import execute_values, Json
3
  import pandas as pd
4
  from sentence_transformers import SentenceTransformer
5
  import os
 
10
  from contextlib import asynccontextmanager
11
  from fastapi.responses import HTMLResponse
12
  import threading
13
+ import json
14
 
15
  # --- Configuration ---
 
16
  SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")
17
 
18
  # --- Toggles & Tuning ---
 
22
 
23
  # --- Global State ---
24
  model = None
25
+ execution_logs = deque(maxlen=50)
26
+ processing_lock = threading.Lock()
27
 
28
+ # --- Lifespan Manager ---
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  global model
32
  print("⏳ Loading Model...")
33
+ # Using the Alibaba GTE ModernBERT as requested
34
  model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
35
  print("βœ… Model Loaded.")
36
  yield
 
42
 
43
  def fetch_and_lock_chunk(conn, chunk_size):
44
  """
45
+ Fetches the next batch of JOBS from the new denormalized schema
46
+ and LOCKS them using FOR UPDATE SKIP LOCKED.
47
  """
48
  query = """
49
  WITH locked_jobs AS (
50
+ SELECT
51
+ id,
52
+ title,
53
+ company_name,
54
+ location,
55
+ work_model,
56
+ employment_type,
57
+ roles_and_responsibilities,
58
+ qualification,
59
+ min_experience
60
  FROM jobs
61
  WHERE
62
+ -- Condition 1: Embedding is missing (New Job)
63
+ embeddings IS NULL
64
  OR
65
+ -- Condition 2: Job created after the last embedding (Retry/Update Logic)
66
+ -- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at
67
+ (embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at)
68
  LIMIT %s
69
+ FOR UPDATE SKIP LOCKED
70
  )
71
+ SELECT * FROM locked_jobs;
 
 
 
 
 
 
 
 
 
 
 
 
72
  """
73
+ # pandas read_sql usually handles JSONB columns as standard Python objects (lists/dicts)
74
  return pd.read_sql_query(query, conn, params=(chunk_size,))
75
 
76
  def clean_and_format_text(row):
77
  """
78
+ Joins denormalized columns into a single semantic string for embedding.
79
  """
80
  # Configuration: Maps DB Column -> Semantic Tag
81
+ # (Column Name in DF, Label for Text)
82
  field_config = [
83
+ ('title', 'Job Title'),
84
+ ('company_name', 'Company'),
85
+ ('location', 'Location'),
86
+ ('work_model', 'Work Model'),
87
+ ('min_experience', 'Minimum Experience (Years)'),
88
  ('roles_and_responsibilities', 'Responsibilities'),
89
+ ('qualification', 'Qualifications')
 
90
  ]
91
 
92
  text_parts = []
93
 
94
  for col_name, tag in field_config:
95
+ if col_name in row and row[col_name] is not None:
96
  data = row[col_name]
97
 
98
+ # Case A: JSONB List (Roles, Qualifications)
99
+ if isinstance(data, list):
100
+ # Filter out empty strings or None values
101
+ clean_items = [str(item).strip() for item in data if item and str(item).strip()]
102
  if clean_items:
103
  text_parts.append(f"{tag}: " + ", ".join(clean_items))
104
 
105
+ # Case B: Standard String/Int (Title, Company, Experience)
106
+ elif str(data).strip():
107
+ clean_text = str(data).strip().replace('\r', '')
 
108
  text_parts.append(f"{tag}: {clean_text}")
109
 
110
+ # Combine all parts with newlines
111
  return "\n".join(text_parts)
112
 
113
  def update_db_batch(conn, updates):
114
  if DRY_RUN: return
115
 
116
+ # Update the 'embeddings' column and the 'embeddings_created_at' timestamp
117
  query = """
118
  UPDATE jobs AS j
119
+ SET embeddings = data.vector::vector,
120
+ embeddings_created_at = NOW()
121
  FROM (VALUES %s) AS data (id, vector)
122
+ WHERE j.id = data.id::uuid
123
  """
124
  cursor = conn.cursor()
125
  try:
126
  execute_values(cursor, query, updates)
127
+ conn.commit()
128
  except Exception as e:
129
  conn.rollback()
130
  raise e
 
135
  """
136
  The core logic that runs one single batch processing for JOBS.
137
  """
138
+ log_buffer = []
139
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
140
 
141
  log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
142
 
143
  conn = None
144
  try:
145
+ conn = psycopg2.connect(SUPABASE_CONNECTION_STRING)
146
 
147
  # 1. Fetch & Lock
148
  df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE)
 
150
  if df.empty:
151
  conn.rollback()
152
  log_buffer.append("πŸ’€ No pending jobs found.")
 
153
  execution_logs.appendleft("<br>".join(log_buffer))
154
  return "No data"
155
 
 
158
  # 2. Clean Text
159
  df['full_text'] = df.apply(clean_and_format_text, axis=1)
160
 
161
+ # 3. Log Inputs (for debugging/visibility)
162
  for index, row in df.iterrows():
163
  log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
164
+ log_buffer.append(f"<strong>ID: {row['id']} - {row.get('title', 'Unknown')}</strong>")
165
+ log_buffer.append(f"<pre style='white-space: pre-wrap; font-size: 0.8em;'>{row['full_text']}</pre>")
166
  log_buffer.append("</div>")
167
 
168
  # 4. Generate Embeddings
169
+ # Note: Ensure the model dimensions match your DB vector size (ModernBERT is typically 768)
170
  embeddings = model.encode(
171
  df['full_text'].tolist(),
172
  batch_size=EMBEDDING_BATCH_SIZE,
 
176
  )
177
 
178
  # 5. Update DB
179
+ updates = list(zip(df['id'].tolist(), embeddings.tolist()))
180
 
181
  if not DRY_RUN:
182
  update_db_batch(conn, updates)
 
191
  print(f"Error: {e}")
192
  finally:
193
  if conn: conn.close()
 
194
  execution_logs.appendleft("<br>".join(log_buffer))
195
 
196
  # --- API Endpoints ---
197
 
198
  @app.get("/", response_class=HTMLResponse)
199
  async def read_root():
 
 
 
200
  html_content = """
201
  <html>
202
  <head>
 
224
 
225
  @app.get("/trigger-batch")
226
  async def trigger_processing(background_tasks: BackgroundTasks):
 
 
 
227
  if processing_lock.locked():
228
  return {"status": "busy", "message": "Worker is currently processing a previous batch."}
229
 
 
230
  background_tasks.add_task(wrapped_worker)
231
  return {"status": "started", "message": "Batch processing started in background."}
232
 
233
  def wrapped_worker():
 
234
  if processing_lock.acquire(blocking=False):
235
  try:
236
  run_worker_logic()