123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- from langchain.embeddings.huggingface import HuggingFaceEmbeddings
- from langchain.vectorstores import FAISS
- from langchain.document_loaders import UnstructuredFileLoader
- from models.chatglm_llm import ChatGLM
- from configs.model_config import *
- import datetime
- from textsplitter import ChineseTextSplitter
- from typing import List, Tuple
- from langchain.docstore.document import Document
- import numpy as np
- from utils import torch_gc
- # return top-k text chunk from vector store
- VECTOR_SEARCH_TOP_K = 6
- # LLM input history length
- LLM_HISTORY_LEN = 3
- DEVICE_ = EMBEDDING_DEVICE
- DEVICE_ID = "0" if torch.cuda.is_available() else None
- DEVICE = f"{DEVICE_}:{DEVICE_ID}" if DEVICE_ID else DEVICE_
- def load_file(filepath):
- if filepath.lower().endswith(".md"):
- loader = UnstructuredFileLoader(filepath, mode="elements")
- docs = loader.load()
- elif filepath.lower().endswith(".pdf"):
- loader = UnstructuredFileLoader(filepath)
- textsplitter = ChineseTextSplitter(pdf=True)
- docs = loader.load_and_split(textsplitter)
- else:
- loader = UnstructuredFileLoader(filepath, mode="elements")
- textsplitter = ChineseTextSplitter(pdf=False)
- docs = loader.load_and_split(text_splitter=textsplitter)
- return docs
- def generate_prompt(related_docs: List[str],
- query: str,
- prompt_template=PROMPT_TEMPLATE) -> str:
- context = "\n".join([doc.page_content for doc in related_docs])
- prompt = prompt_template.replace("{question}", query).replace("{context}", context)
- return prompt
- def get_docs_with_score(docs_with_score):
- docs = []
- for doc, score in docs_with_score:
- doc.metadata["score"] = score
- docs.append(doc)
- return docs
- def seperate_list(ls: List[int]) -> List[List[int]]:
- lists = []
- ls1 = [ls[0]]
- for i in range(1, len(ls)):
- if ls[i - 1] + 1 == ls[i]:
- ls1.append(ls[i])
- else:
- lists.append(ls1)
- ls1 = [ls[i]]
- lists.append(ls1)
- return lists
- def similarity_search_with_score_by_vector(
- self,
- embedding: List[float],
- k: int = 4,
- ) -> List[Tuple[Document, float]]:
- scores, indices = self.index.search(np.array([embedding], dtype=np.float32), k)
- docs = []
- id_set = set()
- for j, i in enumerate(indices[0]):
- if i == -1:
- # This happens when not enough docs are returned.
- continue
- _id = self.index_to_docstore_id[i]
- doc = self.docstore.search(_id)
- id_set.add(i)
- docs_len = len(doc.page_content)
- for k in range(1, max(i, len(docs) - i)):
- for l in [i + k, i - k]:
- if 0 <= l < len(self.index_to_docstore_id):
- _id0 = self.index_to_docstore_id[l]
- doc0 = self.docstore.search(_id0)
- if docs_len + len(doc0.page_content) > self.chunk_size:
- break
- elif doc0.metadata["source"] == doc.metadata["source"]:
- docs_len += len(doc0.page_content)
- id_set.add(l)
- id_list = sorted(list(id_set))
- id_lists = seperate_list(id_list)
- for id_seq in id_lists:
- for id in id_seq:
- if id == id_seq[0]:
- _id = self.index_to_docstore_id[id]
- doc = self.docstore.search(_id)
- else:
- _id0 = self.index_to_docstore_id[id]
- doc0 = self.docstore.search(_id0)
- doc.page_content += doc0.page_content
- if not isinstance(doc, Document):
- raise ValueError(f"Could not find document for id {_id}, got {doc}")
- docs.append((doc, scores[0][j]))
- torch_gc(DEVICE)
- return docs
- class LocalDocQA:
- llm: object = None
- embeddings: object = None
- top_k: int = VECTOR_SEARCH_TOP_K
- chunk_size: int = CHUNK_SIZE
- def init_cfg(self,
- embedding_model: str = EMBEDDING_MODEL,
- embedding_device=EMBEDDING_DEVICE,
- llm_history_len: int = LLM_HISTORY_LEN,
- llm_model: str = LLM_MODEL,
- llm_device=LLM_DEVICE,
- top_k=VECTOR_SEARCH_TOP_K,
- use_ptuning_v2: bool = USE_PTUNING_V2
- ):
- self.llm = ChatGLM()
- self.llm.load_model(model_name_or_path=llm_model_dict[llm_model],
- llm_device=llm_device,
- use_ptuning_v2=use_ptuning_v2)
- self.llm.history_len = llm_history_len
- self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model_dict[embedding_model],
- model_kwargs={'device': embedding_device})
- self.top_k = top_k
- def init_knowledge_vector_store(self,
- filepath: str or List[str],
- vs_path: str or os.PathLike = None):
- loaded_files = []
- if isinstance(filepath, str):
- if not os.path.exists(filepath):
- print("路径不存在")
- return None
- elif os.path.isfile(filepath):
- file = os.path.split(filepath)[-1]
- try:
- docs = load_file(filepath)
- print(f"{file} 已成功加载")
- loaded_files.append(filepath)
- except Exception as e:
- print(e)
- print(f"{file} 未能成功加载")
- return None
- elif os.path.isdir(filepath):
- docs = []
- for file in os.listdir(filepath):
- fullfilepath = os.path.join(filepath, file)
- try:
- docs += load_file(fullfilepath)
- print(f"{file} 已成功加载")
- loaded_files.append(fullfilepath)
- except Exception as e:
- print(e)
- print(f"{file} 未能成功加载")
- else:
- docs = []
- for file in filepath:
- try:
- docs += load_file(file)
- print(f"{file} 已成功加载")
- loaded_files.append(file)
- except Exception as e:
- print(e)
- print(f"{file} 未能成功加载")
- if len(docs) > 0:
- if vs_path and os.path.isdir(vs_path):
- vector_store = FAISS.load_local(vs_path, self.embeddings)
- vector_store.add_documents(docs)
- torch_gc(DEVICE)
- else:
- if not vs_path:
- vs_path = f"""{VS_ROOT_PATH}{os.path.splitext(file)[0]}_FAISS_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}"""
- vector_store = FAISS.from_documents(docs, self.embeddings)
- torch_gc(DEVICE)
- vector_store.save_local(vs_path)
- return vs_path, loaded_files
- else:
- print("文件均未成功加载,请检查依赖包或替换为其他文件再次上传。")
- return None, loaded_files
- def get_knowledge_based_answer(self,
- query,
- vs_path,
- chat_history=[],
- streaming: bool = STREAMING):
- vector_store = FAISS.load_local(vs_path, self.embeddings)
- FAISS.similarity_search_with_score_by_vector = similarity_search_with_score_by_vector
- vector_store.chunk_size = self.chunk_size
- related_docs_with_score = vector_store.similarity_search_with_score(query,
- k=self.top_k)
- related_docs = get_docs_with_score(related_docs_with_score)
- prompt = generate_prompt(related_docs, query)
- # if streaming:
- # for result, history in self.llm._stream_call(prompt=prompt,
- # history=chat_history):
- # history[-1][0] = query
- # response = {"query": query,
- # "result": result,
- # "source_documents": related_docs}
- # yield response, history
- # else:
- for result, history in self.llm._call(prompt=prompt,
- history=chat_history,
- streaming=streaming):
- history[-1][0] = query
- response = {"query": query,
- "result": result,
- "source_documents": related_docs}
- yield response, history
- if __name__ == "__main__":
- local_doc_qa = LocalDocQA()
- local_doc_qa.init_cfg()
- query = "你好"
- vs_path = "/Users/liuqian/Downloads/glm-dev/vector_store/123"
- last_print_len = 0
- for resp, history in local_doc_qa.get_knowledge_based_answer(query=query,
- vs_path=vs_path,
- chat_history=[],
- streaming=True):
- print(resp["result"][last_print_len:], end="", flush=True)
- last_print_len = len(resp["result"])
- for resp, history in local_doc_qa.get_knowledge_based_answer(query=query,
- vs_path=vs_path,
- chat_history=[],
- streaming=False):
- print(resp["result"])
- pass
|