Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import shutil | |
| from glob import glob | |
| def read_json_file(file_path): | |
| file_path = "./script/"+file_path | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| data = json.load(file) | |
| return data | |
| def get_prompt(query: str, history: list): | |
| use_message = {"role": "user", "content": query} | |
| if history is None: | |
| history = [] | |
| history.append(use_message) | |
| message = {"text": history} | |
| return message | |
| def process_response(response_str: str, history: list): | |
| res_dict: dict = json.loads(response_str) | |
| code = res_dict.get("header", {}).get("code") | |
| status = res_dict.get("header", {}).get("status", 2) | |
| if code == 0: | |
| res_dict = res_dict.get("payload", {}).get( | |
| "choices", {}).get("text", [{}])[0] | |
| res_content = res_dict.get("content", "") | |
| if len(res_dict) > 0 and len(res_content) > 0: | |
| # Ignore the unnecessary data | |
| if "index" in res_dict: | |
| del res_dict["index"] | |
| response = res_content | |
| if status == 0: | |
| history.append(res_dict) | |
| else: | |
| history[-1]["content"] += response | |
| response = history[-1]["content"] | |
| return response, history, status | |
| else: | |
| return "", history, status | |
| else: | |
| print("error code ", code) | |
| print("you can see this website to know code detail") | |
| print("https://www.xfyun.cn/doc/spark/%E6%8E%A5%E5%8F%A3%E8%AF%B4%E6%98%8E.html") | |
| return "", history, status | |
| def init_script(history: list, jsonfile): | |
| script_data = read_json_file(jsonfile) | |
| return script_data | |
| def create_script(name, characters, summary, details): | |
| import os | |
| if not os.path.exists("script"): | |
| os.mkdir("script") | |
| data = { | |
| "name": name, | |
| "characters": characters, | |
| "summary": summary, | |
| "details": details | |
| } | |
| json_data = json.dumps(data, ensure_ascii=False) | |
| print(json_data) | |
| with open(f"./script/{name}.json", "w", encoding='utf-8') as file: | |
| file.write(json_data) | |
| pass | |
| def txt2vec(name: str, file_path: str): | |
| from langchain.document_loaders import TextLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| loader = TextLoader(file_path) | |
| data = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=256, chunk_overlap=128) | |
| split_docs = text_splitter.split_documents(data) | |
| from langchain.embeddings.huggingface import HuggingFaceEmbeddings | |
| import sentence_transformers | |
| EMBEDDING_MODEL = "model/text2vec_ernie/" | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) | |
| embeddings.client = sentence_transformers.SentenceTransformer( | |
| embeddings.model_name, device='cuda') | |
| from langchain.vectorstores import FAISS | |
| db = FAISS.from_documents(split_docs, embeddings) | |
| db.save_local(f"data/faiss/{name}/") | |
| def pdf2vec(name: str, file_path: str): | |
| from langchain.document_loaders import PyPDFLoader | |
| loader = PyPDFLoader(file_path) | |
| split_docs = loader.load_and_split() | |
| from langchain.embeddings.huggingface import HuggingFaceEmbeddings | |
| import sentence_transformers | |
| EMBEDDING_MODEL = "model/text2vec_ernie/" | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) | |
| embeddings.client = sentence_transformers.SentenceTransformer( | |
| embeddings.model_name, device='cuda') | |
| from langchain.vectorstores import FAISS | |
| db = FAISS.from_documents(split_docs, embeddings) | |
| db.save_local(f"data/faiss/{name}/") | |
| def mycopyfile(srcfile, dstpath): # 复制函数 | |
| if not os.path.isfile(srcfile): | |
| print("%s not exist!" % (srcfile)) | |
| else: | |
| fpath, fname = os.path.split(srcfile) | |
| print(fpath) | |
| print(fname) # 分离文件名和路径 | |
| if not os.path.exists(dstpath): | |
| os.makedirs(dstpath) # 创建路径 | |
| shutil.copy(srcfile, dstpath + fname) # 复制文件 | |
| print("copy %s -> %s" % (srcfile, dstpath + fname)) |