""" src/utils/profile_scrapers.py Profile-based social media scrapers for Intelligence Agent Competitive Intelligence & Profile Monitoring Tools """ import json import os import time import random import re import logging from typing import Optional, List from datetime import datetime from urllib.parse import quote_plus from langchain_core.tools import tool try: from playwright.sync_api import sync_playwright PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False from src.utils.utils import ( ensure_playwright, load_playwright_storage_state_path, clean_twitter_text, extract_twitter_timestamp, clean_fb_text, extract_media_id_instagram, fetch_caption_via_private_api, ) logger = logging.getLogger("Roger.utils.profile_scrapers") logger.setLevel(logging.INFO) # ===================================================== # TWITTER PROFILE SCRAPER # ===================================================== @tool def scrape_twitter_profile(username: str, max_items: int = 20): """ Twitter PROFILE scraper - targets a specific user's timeline for competitive monitoring. Fetches tweets from a specific user's profile, not search results. Perfect for monitoring competitor accounts, influencers, or specific business profiles. Features: - Retry logic with exponential backoff (3 attempts) - Fallback to keyword search if profile fails - Increased timeout (90s) Args: username: Twitter username (without @) max_items: Maximum number of tweets to fetch Returns: JSON with user's tweets, engagement metrics, and timestamps """ ensure_playwright() # Load Session site = "twitter" session_path = load_playwright_storage_state_path( site, out_dir="src/utils/.sessions" ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "tw_state.json"), os.path.join(os.getcwd(), ".sessions", "tw_state.json"), os.path.join(os.getcwd(), "tw_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[TWITTER_PROFILE] Found session at {path}") break if not session_path: return json.dumps( { "error": "No Twitter session found", "solution": "Run the Twitter session manager to create a session", }, default=str, ) results = [] username = username.lstrip("@") # Remove @ if present try: with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage", ], ) context = browser.new_context( storage_state=session_path, viewport={"width": 1280, "height": 720}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ) context.add_init_script( """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.chrome = {runtime: {}}; """ ) page = context.new_page() # Navigate to user profile with retry logic profile_url = f"https://x.com/{username}" logger.info(f"[TWITTER_PROFILE] Monitoring @{username}") max_retries = 3 navigation_success = False last_error = None for attempt in range(max_retries): try: # Exponential backoff: 0, 2, 4 seconds if attempt > 0: wait_time = 2**attempt logger.info( f"[TWITTER_PROFILE] Retry {attempt + 1}/{max_retries} after {wait_time}s..." ) time.sleep(wait_time) # Increased timeout from 60s to 90s, changed to networkidle page.goto(profile_url, timeout=90000, wait_until="networkidle") time.sleep(5) # Handle popups popup_selectors = [ "[data-testid='app-bar-close']", "[aria-label='Close']", "button:has-text('Not now')", ] for selector in popup_selectors: try: if ( page.locator(selector).count() > 0 and page.locator(selector).first.is_visible() ): page.locator(selector).first.click() time.sleep(1) except: pass # Wait for tweets to load try: page.wait_for_selector( "article[data-testid='tweet']", timeout=20000 ) logger.info(f"[TWITTER_PROFILE] Loaded {username}'s profile") navigation_success = True break except: last_error = f"Could not load tweets for @{username}" logger.warning( f"[TWITTER_PROFILE] {last_error}, attempt {attempt + 1}/{max_retries}" ) continue except Exception as e: last_error = str(e) logger.warning( f"[TWITTER_PROFILE] Navigation failed on attempt {attempt + 1}: {e}" ) continue # If profile scraping failed after all retries, try fallback to keyword search if not navigation_success: logger.warning( f"[TWITTER_PROFILE] Profile scraping failed, falling back to keyword search for '{username}'" ) browser.close() # Fallback: use keyword search instead try: from src.utils.utils import scrape_twitter fallback_result = scrape_twitter.invoke( {"query": username, "max_items": max_items} ) fallback_data = ( json.loads(fallback_result) if isinstance(fallback_result, str) else fallback_result ) if "error" not in fallback_data: fallback_data["fallback_used"] = True fallback_data["original_error"] = last_error fallback_data["note"] = ( f"Used keyword search as fallback for @{username}" ) return json.dumps(fallback_data, default=str) except Exception as fallback_error: logger.error( f"[TWITTER_PROFILE] Fallback also failed: {fallback_error}" ) return json.dumps( { "error": last_error or f"Profile not found or private: @{username}", "fallback_attempted": True, }, default=str, ) # Check if logged in if "login" in page.url: logger.error("[TWITTER_PROFILE] Session expired") return json.dumps({"error": "Session invalid or expired"}, default=str) # Scraping with engagement metrics seen = set() scroll_attempts = 0 max_scroll_attempts = 10 TWEET_SELECTOR = "article[data-testid='tweet']" TEXT_SELECTOR = "div[data-testid='tweetText']" while len(results) < max_items and scroll_attempts < max_scroll_attempts: scroll_attempts += 1 # Expand "Show more" buttons try: show_more_buttons = page.locator( "[data-testid='tweet-text-show-more-link']" ).all() for button in show_more_buttons: if button.is_visible(): try: button.click() time.sleep(0.3) except: pass except: pass # Collect tweets tweets = page.locator(TWEET_SELECTOR).all() new_tweets_found = 0 for tweet in tweets: if len(results) >= max_items: break try: tweet.scroll_into_view_if_needed() time.sleep(0.2) # Skip promoted/ads if ( tweet.locator("span:has-text('Promoted')").count() > 0 or tweet.locator("span:has-text('Ad')").count() > 0 ): continue # Extract text text_content = "" text_element = tweet.locator(TEXT_SELECTOR).first if text_element.count() > 0: text_content = text_element.inner_text() cleaned_text = clean_twitter_text(text_content) # Extract timestamp timestamp = extract_twitter_timestamp(tweet) # Extract engagement metrics likes = 0 retweets = 0 replies = 0 try: # Likes like_button = tweet.locator("[data-testid='like']") if like_button.count() > 0: like_text = ( like_button.first.get_attribute("aria-label") or "" ) like_match = re.search(r"(\d+)", like_text) if like_match: likes = int(like_match.group(1)) # Retweets retweet_button = tweet.locator("[data-testid='retweet']") if retweet_button.count() > 0: rt_text = ( retweet_button.first.get_attribute("aria-label") or "" ) rt_match = re.search(r"(\d+)", rt_text) if rt_match: retweets = int(rt_match.group(1)) # Replies reply_button = tweet.locator("[data-testid='reply']") if reply_button.count() > 0: reply_text = ( reply_button.first.get_attribute("aria-label") or "" ) reply_match = re.search(r"(\d+)", reply_text) if reply_match: replies = int(reply_match.group(1)) except: pass # Extract tweet URL tweet_url = f"https://x.com/{username}" try: link_element = tweet.locator("a[href*='/status/']").first if link_element.count() > 0: href = link_element.get_attribute("href") if href: tweet_url = f"https://x.com{href}" except: pass # Deduplication text_key = cleaned_text[:50] if cleaned_text else "" unique_key = f"{username}_{text_key}_{timestamp}" if ( cleaned_text and len(cleaned_text) > 20 and unique_key not in seen ): seen.add(unique_key) results.append( { "source": "Twitter", "poster": f"@{username}", "text": cleaned_text, "timestamp": timestamp, "url": tweet_url, "likes": likes, "retweets": retweets, "replies": replies, } ) new_tweets_found += 1 logger.info( f"[TWITTER_PROFILE] Tweet {len(results)}/{max_items} (♥{likes} ↻{retweets})" ) except Exception as e: logger.debug(f"[TWITTER_PROFILE] Error: {e}") continue # Scroll if needed if len(results) < max_items: page.evaluate( "window.scrollTo(0, document.documentElement.scrollHeight)" ) time.sleep(random.uniform(2, 3)) if new_tweets_found == 0: break browser.close() return json.dumps( { "site": "Twitter Profile", "username": username, "results": results, "total_found": len(results), "fetched_at": datetime.utcnow().isoformat(), }, default=str, ) except Exception as e: logger.error(f"[TWITTER_PROFILE] {e}") return json.dumps({"error": str(e)}, default=str) # ===================================================== # FACEBOOK PROFILE SCRAPER # ===================================================== @tool def scrape_facebook_profile(profile_url: str, max_items: int = 10): """ Facebook PROFILE scraper - monitors a specific page or user profile. Scrapes posts from a specific Facebook page/profile timeline for competitive monitoring. Args: profile_url: Full Facebook profile/page URL (e.g., "https://www.facebook.com/DialogAxiata") max_items: Maximum number of posts to fetch Returns: JSON with profile's posts, engagement metrics, and timestamps """ ensure_playwright() # Load Session site = "facebook" session_path = load_playwright_storage_state_path( site, out_dir="src/utils/.sessions" ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "fb_state.json"), os.path.join(os.getcwd(), ".sessions", "fb_state.json"), os.path.join(os.getcwd(), "fb_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[FACEBOOK_PROFILE] Found session at {path}") break if not session_path: return json.dumps( { "error": "No Facebook session found", "solution": "Run the Facebook session manager to create a session", }, default=str, ) results = [] try: with sync_playwright() as p: facebook_desktop_ua = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) browser = p.chromium.launch(headless=True) context = browser.new_context( storage_state=session_path, user_agent=facebook_desktop_ua, viewport={"width": 1400, "height": 900}, ) page = context.new_page() logger.info(f"[FACEBOOK_PROFILE] Monitoring {profile_url}") page.goto(profile_url, timeout=120000) time.sleep(5) # Check if logged in if "login" in page.url: logger.error("[FACEBOOK_PROFILE] Session expired") return json.dumps({"error": "Session invalid or expired"}, default=str) seen = set() stuck = 0 last_scroll = 0 MESSAGE_SELECTOR = "div[data-ad-preview='message']" # Poster selectors POSTER_SELECTORS = [ "h3 strong a span", "h3 strong span", "h3 a span", "strong a span", "a[role='link'] span:not([class*='timestamp'])", "span.fwb a", "span.fwb", "a[aria-hidden='false'] span", "a[role='link'] span", ] def extract_poster(post): """Extract poster name from Facebook post""" parent = post.locator( "xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" ) for selector in POSTER_SELECTORS: try: el = parent.locator(selector).first if el and el.count() > 0: name = el.inner_text().strip() if name and name != "Facebook" and len(name) > 1: return name except: pass return "(Unknown)" # IMPROVED: Expand ALL "See more" buttons on page before extracting def expand_all_see_more(): """Click all 'See more' buttons on the visible page""" see_more_selectors = [ # Primary Facebook "See more" patterns "div[role='button'] span:text-is('See more')", "div[role='button']:has-text('See more')", "span:text-is('See more')", "span:text-is('… See more')", "span:text-is('...See more')", # Alternate patterns "[role='button']:has-text('See more')", "div.x1i10hfl:has-text('See more')", # Direct text match "text='See more'", "text='… See more'", ] clicked = 0 for selector in see_more_selectors: try: buttons = page.locator(selector).all() for btn in buttons: try: if btn.is_visible(): btn.scroll_into_view_if_needed() time.sleep(0.2) btn.click(force=True) clicked += 1 time.sleep(0.3) except: pass except: pass if clicked > 0: logger.info( f"[FACEBOOK_PROFILE] Expanded {clicked} 'See more' buttons" ) return clicked while len(results) < max_items: # First expand all "See more" on visible content expand_all_see_more() time.sleep(0.5) posts = page.locator(MESSAGE_SELECTOR).all() for post in posts: try: # Try to expand within this specific post container too try: post.scroll_into_view_if_needed() time.sleep(0.3) # Look for See more in parent container parent = post.locator( "xpath=ancestor::div[contains(@class, 'x1yztbdb')][1]" ) post_see_more_selectors = [ "div[role='button'] span:text-is('See more')", "span:text-is('See more')", "div[role='button']:has-text('See more')", ] for selector in post_see_more_selectors: try: btns = parent.locator(selector) if btns.count() > 0 and btns.first.is_visible(): btns.first.click(force=True) time.sleep(0.5) break except: pass except: pass raw = post.inner_text().strip() cleaned = clean_fb_text(raw) poster = extract_poster(post) if cleaned and len(cleaned) > 30: key = poster + "::" + cleaned if key not in seen: seen.add(key) results.append( { "source": "Facebook", "poster": poster, "text": cleaned, "url": profile_url, } ) logger.info( f"[FACEBOOK_PROFILE] Collected post {len(results)}/{max_items}" ) if len(results) >= max_items: break except: pass # Scroll page.evaluate("window.scrollBy(0, 2300)") time.sleep(1.5) new_scroll = page.evaluate("window.scrollY") stuck = stuck + 1 if new_scroll == last_scroll else 0 last_scroll = new_scroll if stuck >= 3: logger.info("[FACEBOOK_PROFILE] Reached end of results") break browser.close() return json.dumps( { "site": "Facebook Profile", "profile_url": profile_url, "results": results[:max_items], "storage_state": session_path, }, default=str, ) except Exception as e: logger.error(f"[FACEBOOK_PROFILE] {e}") return json.dumps({"error": str(e)}, default=str) # ===================================================== # INSTAGRAM PROFILE SCRAPER # ===================================================== @tool def scrape_instagram_profile(username: str, max_items: int = 15): """ Instagram PROFILE scraper - monitors a specific user's profile. Scrapes posts from a specific Instagram user's profile grid for competitive monitoring. Args: username: Instagram username (without @) max_items: Maximum number of posts to fetch Returns: JSON with user's posts, captions, and engagement """ ensure_playwright() # Load Session site = "instagram" session_path = load_playwright_storage_state_path( site, out_dir="src/utils/.sessions" ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "ig_state.json"), os.path.join(os.getcwd(), ".sessions", "ig_state.json"), os.path.join(os.getcwd(), "ig_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[INSTAGRAM_PROFILE] Found session at {path}") break if not session_path: return json.dumps( { "error": "No Instagram session found", "solution": "Run the Instagram session manager to create a session", }, default=str, ) username = username.lstrip("@") # Remove @ if present results = [] try: with sync_playwright() as p: instagram_mobile_ua = ( "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" ) browser = p.chromium.launch(headless=True) context = browser.new_context( storage_state=session_path, user_agent=instagram_mobile_ua, viewport={"width": 430, "height": 932}, ) page = context.new_page() url = f"https://www.instagram.com/{username}/" logger.info(f"[INSTAGRAM_PROFILE] Monitoring @{username}") page.goto(url, timeout=120000) page.wait_for_timeout(4000) # Check if logged in and profile exists if "login" in page.url: logger.error("[INSTAGRAM_PROFILE] Session expired") return json.dumps({"error": "Session invalid or expired"}, default=str) # Scroll to load posts for _ in range(8): page.mouse.wheel(0, 2500) page.wait_for_timeout(1500) # Collect post links anchors = page.locator("a[href*='/p/'], a[href*='/reel/']").all() links = [] for a in anchors: href = a.get_attribute("href") if href: full = "https://www.instagram.com" + href links.append(full) if len(links) >= max_items: break logger.info( f"[INSTAGRAM_PROFILE] Found {len(links)} posts from @{username}" ) # Extract captions from each post for link in links: logger.info(f"[INSTAGRAM_PROFILE] Scraping {link}") page.goto(link, timeout=120000) page.wait_for_timeout(2000) media_id = extract_media_id_instagram(page) caption = fetch_caption_via_private_api(page, media_id) # Fallback to direct extraction if not caption: try: caption = ( page.locator("article h1, article span") .first.inner_text() .strip() ) except: caption = None if caption: results.append( { "source": "Instagram", "poster": f"@{username}", "text": caption, "url": link, } ) logger.info( f"[INSTAGRAM_PROFILE] Collected post {len(results)}/{max_items}" ) browser.close() return json.dumps( { "site": "Instagram Profile", "username": username, "results": results, "storage_state": session_path, }, default=str, ) except Exception as e: logger.error(f"[INSTAGRAM_PROFILE] {e}") return json.dumps({"error": str(e)}, default=str) # ===================================================== # LINKEDIN PROFILE SCRAPER # ===================================================== @tool def scrape_linkedin_profile(company_or_username: str, max_items: int = 10): """ LinkedIn PROFILE scraper - monitors a company or user profile. Scrapes posts from a specific LinkedIn company or personal profile for competitive monitoring. Args: company_or_username: LinkedIn company name or username (e.g., "dialog-axiata" or "company/dialog-axiata") max_items: Maximum number of posts to fetch Returns: JSON with profile's posts and engagement """ ensure_playwright() # Load Session site = "linkedin" session_path = load_playwright_storage_state_path( site, out_dir="src/utils/.sessions" ) if not session_path: session_path = load_playwright_storage_state_path(site, out_dir=".sessions") # Check for alternative session file name if not session_path: alt_paths = [ os.path.join(os.getcwd(), "src", "utils", ".sessions", "li_state.json"), os.path.join(os.getcwd(), ".sessions", "li_state.json"), os.path.join(os.getcwd(), "li_state.json"), ] for path in alt_paths: if os.path.exists(path): session_path = path logger.info(f"[LINKEDIN_PROFILE] Found session at {path}") break if not session_path: return json.dumps( { "error": "No LinkedIn session found", "solution": "Run the LinkedIn session manager to create a session", }, default=str, ) results = [] try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( storage_state=session_path, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", viewport={"width": 1400, "height": 900}, ) page = context.new_page() # Construct profile URL if not company_or_username.startswith("http"): if "company/" in company_or_username: profile_url = f"https://www.linkedin.com/company/{company_or_username.replace('company/', '')}" else: profile_url = f"https://www.linkedin.com/in/{company_or_username}" else: profile_url = company_or_username logger.info(f"[LINKEDIN_PROFILE] Monitoring {profile_url}") page.goto(profile_url, timeout=120000) page.wait_for_timeout(5000) # Check if logged in if "login" in page.url or "authwall" in page.url: logger.error("[LINKEDIN_PROFILE] Session expired") return json.dumps({"error": "Session invalid or expired"}, default=str) # Navigate to posts section try: posts_tab = page.locator( "a:has-text('Posts'), button:has-text('Posts')" ).first if posts_tab.is_visible(): posts_tab.click() page.wait_for_timeout(3000) except: logger.warning("[LINKEDIN_PROFILE] Could not find posts tab") seen = set() no_new_data_count = 0 previous_height = 0 POST_CONTAINER_SELECTOR = "div.feed-shared-update-v2" TEXT_SELECTOR = "span.break-words" POSTER_SELECTOR = "span.update-components-actor__name span[dir='ltr']" while len(results) < max_items and no_new_data_count < 3: # Expand "see more" buttons try: see_more_buttons = page.locator( "button.feed-shared-inline-show-more-text__see-more-less-toggle" ).all() for btn in see_more_buttons: if btn.is_visible(): try: btn.click(timeout=500) except: pass except: pass posts = page.locator(POST_CONTAINER_SELECTOR).all() for post in posts: if len(results) >= max_items: break try: post.scroll_into_view_if_needed() raw_text = "" text_el = post.locator(TEXT_SELECTOR).first if text_el.is_visible(): raw_text = text_el.inner_text() # Clean text cleaned_text = raw_text if cleaned_text: cleaned_text = re.sub( r"…\s*see more", "", cleaned_text, flags=re.IGNORECASE ) cleaned_text = re.sub( r"See translation", "", cleaned_text, flags=re.IGNORECASE, ) cleaned_text = cleaned_text.strip() poster_name = "(Unknown)" poster_el = post.locator(POSTER_SELECTOR).first if poster_el.is_visible(): poster_name = poster_el.inner_text().strip() key = f"{poster_name[:20]}::{cleaned_text[:30]}" if cleaned_text and len(cleaned_text) > 20 and key not in seen: seen.add(key) results.append( { "source": "LinkedIn", "poster": poster_name, "text": cleaned_text, "url": profile_url, } ) logger.info( f"[LINKEDIN_PROFILE] Found post {len(results)}/{max_items}" ) except: continue # Scroll page.evaluate("window.scrollTo(0, document.body.scrollHeight)") page.wait_for_timeout(random.randint(2000, 4000)) new_height = page.evaluate("document.body.scrollHeight") if new_height == previous_height: no_new_data_count += 1 else: no_new_data_count = 0 previous_height = new_height browser.close() return json.dumps( { "site": "LinkedIn Profile", "profile": company_or_username, "results": results, "storage_state": session_path, }, default=str, ) except Exception as e: logger.error(f"[LINKEDIN_PROFILE] {e}") return json.dumps({"error": str(e)}, default=str) # ===================================================== # PRODUCT REVIEW AGGREGATOR # ===================================================== @tool def scrape_product_reviews( product_keyword: str, platforms: Optional[List[str]] = None, max_items: int = 10 ): """ Multi-platform product review aggregator for competitive intelligence. Searches for product reviews and mentions across Reddit and Twitter. Args: product_keyword: Product name to search for platforms: List of platforms to search (default: ["reddit", "twitter"]) max_items: Maximum number of reviews per platform Returns: JSON with aggregated reviews from multiple platforms """ if platforms is None: platforms = ["reddit", "twitter"] all_reviews = [] try: # Import tool factory for independent tool instances # This ensures parallel execution safety from src.utils.tool_factory import create_tool_set local_tools = create_tool_set() # Reddit reviews if "reddit" in platforms: try: reddit_tool = local_tools.get("scrape_reddit") if reddit_tool: reddit_data = reddit_tool.invoke( { "keywords": [f"{product_keyword} review", product_keyword], "limit": max_items, } ) reddit_results = ( json.loads(reddit_data) if isinstance(reddit_data, str) else reddit_data ) if "results" in reddit_results: for item in reddit_results["results"]: all_reviews.append( { "platform": "Reddit", "text": item.get("text", ""), "url": item.get("url", ""), "poster": item.get("poster", "Unknown"), } ) logger.info( f"[PRODUCT_REVIEWS] Collected {len([r for r in all_reviews if r['platform'] == 'Reddit'])} Reddit reviews" ) except Exception as e: logger.error(f"[PRODUCT_REVIEWS] Reddit error: {e}") # Twitter reviews if "twitter" in platforms: try: twitter_tool = local_tools.get("scrape_twitter") if twitter_tool: twitter_data = twitter_tool.invoke( { "query": f"{product_keyword} review OR {product_keyword} rating", "max_items": max_items, } ) twitter_results = ( json.loads(twitter_data) if isinstance(twitter_data, str) else twitter_data ) if "results" in twitter_results: for item in twitter_results["results"]: all_reviews.append( { "platform": "Twitter", "text": item.get("text", ""), "url": item.get("url", ""), "poster": item.get("poster", "Unknown"), } ) logger.info( f"[PRODUCT_REVIEWS] Collected {len([r for r in all_reviews if r['platform'] == 'Twitter'])} Twitter reviews" ) except Exception as e: logger.error(f"[PRODUCT_REVIEWS] Twitter error: {e}") return json.dumps( { "product": product_keyword, "total_reviews": len(all_reviews), "reviews": all_reviews, "platforms_searched": platforms, }, default=str, ) except Exception as e: logger.error(f"[PRODUCT_REVIEWS] {e}") return json.dumps({"error": str(e)}, default=str)