Spaces:
Running
Running
| import os | |
| from typing import ( | |
| Any, | |
| Union, | |
| ) | |
| import zipfile | |
| import streamlit as st | |
| from streamlit.runtime.uploaded_file_manager import ( | |
| UploadedFile, | |
| UploadedFileRec, | |
| UploadedFileManager, | |
| ) | |
| from streamlit.runtime.scriptrunner import get_script_run_ctx | |
| from supabase.client import Client | |
| from langchain.vectorstores.supabase import SupabaseVectorStore | |
| from components_keys import ComponentsKeys | |
| from loaders.audio import process_audio | |
| from loaders.txt import process_txt | |
| from loaders.csv import process_csv | |
| from loaders.markdown import process_markdown | |
| from loaders.pdf import process_pdf | |
| from loaders.html import ( | |
| create_html_file, | |
| delete_tempfile, | |
| get_html, | |
| process_html, | |
| ) | |
| from loaders.powerpoint import process_powerpoint | |
| from loaders.docx import process_docx | |
| from utils import compute_sha1_from_content | |
| ctx = get_script_run_ctx() | |
| manager = UploadedFileManager() | |
| file_processors = { | |
| ".txt": process_txt, | |
| ".csv": process_csv, | |
| ".md": process_markdown, | |
| ".markdown": process_markdown, | |
| ".m4a": process_audio, | |
| ".mp3": process_audio, | |
| ".webm": process_audio, | |
| ".mp4": process_audio, | |
| ".mpga": process_audio, | |
| ".wav": process_audio, | |
| ".mpeg": process_audio, | |
| ".pdf": process_pdf, | |
| ".html": process_html, | |
| ".pptx": process_powerpoint, | |
| ".docx": process_docx | |
| } | |
| def file_uploader(supabase, vector_store): | |
| # Omit zip file support if the `st.secrets.self_hosted` != "true" because | |
| # a zip file can consist of multiple files so the limit on 1 file uploaded | |
| # at a time in the demo can be circumvented. | |
| accepted_file_extensions = list(file_processors.keys()) | |
| accept_multiple_files = st.secrets.self_hosted == "true" | |
| if accept_multiple_files: | |
| accepted_file_extensions += [".zip"] | |
| files = st.file_uploader( | |
| "**Upload a file**", | |
| accept_multiple_files=accept_multiple_files, | |
| type=accepted_file_extensions, | |
| key=ComponentsKeys.FILE_UPLOADER, | |
| ) | |
| if st.secrets.self_hosted == "false": | |
| st.markdown("**In demo mode, the max file size is 1MB**") | |
| if st.button("Add to Database"): | |
| # Single file upload | |
| if isinstance(files, UploadedFile): | |
| filter_file(files, supabase, vector_store) | |
| # Multiple files upload | |
| elif isinstance(files, list): | |
| for file in files: | |
| filter_file(file, supabase, vector_store) | |
| def file_already_exists(supabase, file): | |
| file_sha1 = compute_sha1_from_content(file.getvalue()) | |
| response = supabase.table("documents").select("id").eq("metadata->>file_sha1", file_sha1).execute() | |
| return len(response.data) > 0 | |
| def file_to_uploaded_file(file: Any) -> Union[None, UploadedFile]: | |
| """Convert a file to a streamlit `UploadedFile` object. | |
| This allows us to unzip files and treat them the same way | |
| streamlit treats files uploaded through the file uploader. | |
| Parameters | |
| --------- | |
| file : Any | |
| The file. Can be any file supported by this app. | |
| Returns | |
| ------- | |
| Union[None, UploadedFile] | |
| The file converted to a streamlit `UploadedFile` object. | |
| Returns `None` if the script context cannot be grabbed. | |
| """ | |
| if ctx is None: | |
| print("script context not found, skipping uploading file:", file.name) | |
| return | |
| file_extension = os.path.splitext(file.name)[-1] | |
| file_name = file.name | |
| file_data = file.read() | |
| # The file manager will automatically assign an ID so pass `None` | |
| # Reference: https://github.com/streamlit/streamlit/blob/9a6ce804b7977bdc1f18906d1672c45f9a9b3398/lib/streamlit/runtime/uploaded_file_manager.py#LL98C6-L98C6 | |
| uploaded_file_rec = UploadedFileRec(None, file_name, file_extension, file_data) | |
| uploaded_file_rec = manager.add_file( | |
| ctx.session_id, | |
| ComponentsKeys.FILE_UPLOADER, | |
| uploaded_file_rec, | |
| ) | |
| return UploadedFile(uploaded_file_rec) | |
| def filter_zip_file( | |
| file: UploadedFile, | |
| supabase: Client, | |
| vector_store: SupabaseVectorStore, | |
| ) -> None: | |
| """Unzip the zip file then filter each unzipped file. | |
| Parameters | |
| ---------- | |
| file : UploadedFile | |
| The uploaded file from the file uploader. | |
| supabase : Client | |
| The supabase client. | |
| vector_store : SupabaseVectorStore | |
| The vector store in the database. | |
| """ | |
| with zipfile.ZipFile(file, "r") as z: | |
| unzipped_files = z.namelist() | |
| for unzipped_file in unzipped_files: | |
| with z.open(unzipped_file, "r") as f: | |
| filter_file(f, supabase, vector_store) | |
| def filter_file(file, supabase, vector_store): | |
| # Streamlit file uploads are of type `UploadedFile` which has the | |
| # necessary methods and attributes for this app to work. | |
| if not isinstance(file, UploadedFile): | |
| file = file_to_uploaded_file(file) | |
| file_extension = os.path.splitext(file.name)[-1] | |
| if file_extension == ".zip": | |
| filter_zip_file(file, supabase, vector_store) | |
| return True | |
| if file_already_exists(supabase, file): | |
| st.write(f"π {file.name} is already in the database.") | |
| return False | |
| if file.size < 1: | |
| st.write(f"π¨ {file.name} is empty.") | |
| return False | |
| if file_extension in file_processors: | |
| if st.secrets.self_hosted == "false": | |
| file_processors[file_extension](vector_store, file, stats_db=supabase) | |
| else: | |
| file_processors[file_extension](vector_store, file, stats_db=None) | |
| st.write(f"β {file.name} ") | |
| return True | |
| st.write(f"β {file.name} is not a valid file type.") | |
| return False | |
| def url_uploader(supabase, vector_store): | |
| url = st.text_area("**Add an url**",placeholder="https://meraGPT.com") | |
| button = st.button("Add the URL to the database") | |
| if button: | |
| if not st.session_state["overused"]: | |
| html = get_html(url) | |
| if html: | |
| st.write(f"Getting content ... {url} ") | |
| try: | |
| file, temp_file_path = create_html_file(url, html) | |
| except UnicodeEncodeError as e: | |
| st.write(f"β Error encoding character: {e}") | |
| file, temp_file_path = create_html_file(url, html) | |
| ret = filter_file(file, supabase, vector_store) | |
| delete_tempfile(temp_file_path, url, ret) | |
| else: | |
| st.write(f"β Failed to access to {url} .") | |
| else: | |
| st.write("You have reached your daily limit. Please come back later or self host the solution.") |