https://www.bilibili.com/video/BV1ZmBQYjEea/?share_source=copy_web&vd_source=7c31b4c90861db40c5b220f32fbc4899
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh
bash standalone_embed.sh start
!pip install pymilvus==2.4.7
'pymilvus[model]'
torch
[
{
'book'
:
'偽自由書'
,
'title'
:
'最藝術的國家'
,
'author'
:
'魯迅'
,
'type'
:
''
,
'source'
:
''
,
'date'
:
''
,
'content'
:
'我們中國的最偉大最永久,而且最普遍的“藝術”是男人扮女人...
},
{
'
book
': '
偽自由書
',
'
title
': '
王道詩話
',
'
author
': '
魯迅
',
'
type
': '',
'
source
': '',
'
date
': '',
'
content
': '
《人權論》是從鸚鵡開頭的,據說古時候有一隻高飛遠走的鸚哥兒...
},
...
]
import torch
import json
from pymilvus import DataType, MilvusClient
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
# 將輸入的文字向量化
def vectorize_query(query , model_name =
'BAAI/bge-small-zh-v1.5'
):
# 檢查是否有可用的CUDA裝置
device =
'cuda:0'if
torch.cuda.is_available()
else'cpu'
# 根據裝置選擇是否使用fp16
use_fp16 = device.startswith(
'cuda'
)
# 建立嵌入模型例項
bge_m3_ef = BGEM3EmbeddingFunction(
model_name=model_name,
device=device,
use_fp16=use_fp16
)
# 把輸入的文字向量化
vectors = bge_m3_ef.encode_documents(query)
return
vectors
def vectorize_file(input_file_path, output_file_path, field_name):
# 讀取 json 檔案,把chunk欄位的值向量化
with open(input_file_path,
'r'
, encoding=
'utf-8'
) as file:
data_list = json.load(file)
# 提取該json檔案中的所有chunk欄位的值
query = [data[field_name]
for
data
in
data_list]
# 向量化文字資料
vectors = vectorize_query(query)
# 將向量新增到原始文字中
for
data, vector
in
zip(data_list, vectors[
'dense'
]):
# 將 NumPy 陣列轉換為 Python 的普通列表
data[
'vector'
] = vector.tolist()
# 將更新後的文字內容寫入新的json檔案
with open(output_file_path,
'w'
, encoding=
'utf-8'
) as outfile:
json.dump(data_list, outfile, ensure_ascii=False, indent=4)
# 向量化固定分塊的檔案
vectorize_file(
'luxun_sample_fixed_chunk.json'
,
'luxun_sample_fixed_chunk_vector.json'
,
'chunk'
)
# 向量化透過標點符號分塊的檔案
vectorize_file(
'luxun_sample_semantic_chunk.json'
,
'luxun_sample_semantic_chunk_vector.json'
,
'chunk'
)
# 向量化透過句子分塊的檔案
vectorize_file(
'luxun_sample_sentence_window.json'
,
'luxun_sample_sentence_window_vector.json'
,
'chunk'
)
partition_key_field
的值為 method
,它表示採用的分塊方法。Milvus 會根據 method
欄位的值,把資料插入到對應的分割槽中。打個比方,如果把集合看作一個 excel 檔案,partition (分割槽)就是表格的工作表(Worksheet)。一個 excel 檔案包含多張工作表,不同的資料填寫在對應的工作表中。相應的,我們把不同的資料插入到對應分割槽中,搜尋時指定分割槽,就可以提高搜尋效率。# 建立集合
from pymilvus import MilvusClient, DataType
import time
def create_collection(collection_name):
# 檢查同名集合是否存在,如果存在則刪除
if
milvus_client.has_collection(collection_name):
print
(f
'集合 {collection_name} 已經存在'
)
try:
# 刪除同名集合
milvus_client.drop_collection(collection_name)
print
(f
'刪除集合:{collection_name}'
)
except Exception as e:
print
(f
'刪除集合時出現錯誤: {e}'
)
# 建立集合模式
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
# 設定partition key
partition_key_field =
'method'
,
# 設定分割槽數量,預設為16
num_partitions=16,
description=
''
)
# 新增欄位到schema
schema.add_field(field_name=
'id'
, datatype=DataType.VARCHAR, is_primary=True, max_length=256)
schema.add_field(field_name=
'book'
, datatype=DataType.VARCHAR, max_length=128)
schema.add_field(field_name=
'title'
, datatype=DataType.VARCHAR, max_length=128)
schema.add_field(field_name=
'author'
, datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name=
'type'
, datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name=
'source'
, datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name=
'date'
, datatype=DataType.VARCHAR, max_length=32)
schema.add_field(field_name=
'chunk'
, datatype=DataType.VARCHAR, max_length=2048)
schema.add_field(field_name=
'window'
, datatype=DataType.VARCHAR, default_value=
''
, max_length=6144)
schema.add_field(field_name=
'method'
, datatype=DataType.VARCHAR, max_length=32)
schema.add_field(field_name=
'vector'
, datatype=DataType.FLOAT_VECTOR, dim=512)
# 建立集合
try:
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
shards_num=2
)
print
(f
'建立集合:{collection_name}'
)
except Exception as e:
print
(f
'建立集合的過程中出現了錯誤: {e}'
)
# 等待集合建立成功
while
not milvus_client.has_collection(collection_name):
# 獲取集合的詳細資訊
time.sleep(1)
if
milvus_client.has_collection(collection_name):
print
(f
'集合 {collection_name} 建立成功'
)
collection_info = milvus_client.describe_collection(collection_name)
print
(f
'集合資訊: {collection_info}'
)
collection_name =
'LuXunWorks'
uri=
'http://localhost:19530'
milvus_client = MilvusClient(uri=uri)
create_collection(collection_name)
from tqdm import tqdm
# 資料入庫
def insert_vectors(file_path, collection_name, batch_size):
# 讀取和處理檔案
with open(file_path,
'r'
) as file:
data = json.load(file)
# 將資料插入集合
print
(f
'正在將資料插入集合:{collection_name}'
)
total_count = len(data)
# pbar 是 tqdm 庫中的一個進度條物件,用於顯示插入資料的進度
with tqdm(total=total_count, desc=
'插入資料'
) as pbar:
# 每次插入 batch_size 條資料
for
i
in
range(0, total_count, batch_size):
batch_data = data[i:i + batch_size]
res = milvus_client.insert(
collection_name=collection_name,
data=batch_data
)
pbar.update(len(batch_data))
# 驗證資料是否成功插入集合
print
(f
'插入的實體數量: {total_count}'
)
# 設定每次插入的資料量
batch_size = 100
insert_vectors(
'luxun_sample_fixed_chunk_vector.json'
, collection_name, batch_size)
insert_vectors(
'luxun_sample_semantic_chunk_vector.json'
, collection_name, batch_size)
insert_vectors(
'luxun_sample_sentence_window_vector.json'
, collection_name, batch_size)
index_params = milvus_client.prepare_index_params()
index_params.add_index(
# 指定索引名稱
index_name=
'IVF_FLAT'
,
# 指定建立索引的欄位
field_name=
'vector'
,
# 設定索引型別
index_type=
'IVF_FLAT'
,
# 設定度量方式
metric_type=
'IP'
,
# 設定索引聚類中心的數量
params={
'nlist'
: 128}
)
milvus_client.create_index(
# 指定為建立索引的集合
collection_name=collection_name,
# 使用前面建立的索引引數建立索引
index_params=index_params
)
res = milvus_client.list_indexes(
collection_name=collection_name
)
print
(res)
['IVF_FLAT']
。再檢視下索引的詳細資訊。
res = milvus_client.describe_index(
collection_name=collection_name,
index_name=
'IVF_FLAT'
)
print
(res)
{
'nlist'
:
'128'
,
'index_type'
:
'IVF_FLAT'
,
'metric_type'
:
'IP'
,
'field_name'
:
'vector'
,
'index_name'
:
'IVF_FLAT'
,
'total_rows'
: 0,
'indexed_rows'
: 0,
'pending_index_rows'
: 0,
'state'
:
'Finished'
}
print
(f
'正在載入集合:{collection_name}'
)
milvus_client.load_collection (collection_name=collection_name)
print
(milvus_client.get_load_state (collection_name=collection_name))
{'state': <LoadState: Loaded>}
,說明載入完成。接下來,我們定義搜尋函式。
search_params = {
# 度量型別
'metric_type'
:
'IP'
,
# 搜尋過程中要查詢的聚類單元數量。增加nprobe值可以提高搜尋精度,但會降低搜尋速度
'params'
: {
'nprobe'
: 16}
}
partition_key_field
嗎?它會根據 method
欄位的值,把資料插入到相應的分割槽中。而搜尋函式中的 filter
引數,就是用來指定在哪個分割槽中搜索的。
def vector_search(
query,
search_params,
limit
,
output_fields,
partition_name
):
# 將查詢轉換為向量
query_vectors = [vectorize_query(query)[
'dense'
][0].tolist()]
# 向量搜尋
res = milvus_client.search(
collection_name=collection_name,
# 指定查詢向量
data=query_vectors,
# 指定搜尋的欄位
anns_field=
'vector'
,
# 設定搜尋引數
search_params=search_params,
# 設定搜尋結果的數量
limit
=
limit
,
# 設定輸出欄位
output_fields=output_fields,
# 在指定分割槽中搜索
filter=f
'method =='{partition_name}''
)
return
res
# 列印向量搜尋結果
def print_vector_results(res):
# hit是搜尋結果中的每一個匹配的實體
res = [hit[
'entity'
]
for
hit
in
res[0]]
for
item
in
res:
print
(f
'title: {item['title']}'
)
print
(f
'chunk: {item['chunk']}'
)
print
(f
'method: {item['method']}'
)
print
(
'-'
*50)
<you_api_key>
替換成你自己的 api key。
import os
os.environ[
'DEEPSEEK_API_KEY'
] = <you_api_key>
deepseek_api_key = os.getenv(
'DEEPSEEK_API_KEY'
)
# 安裝 openai 庫
pip install openai
# 匯入openai庫
from openai import OpenAI
# 匯入os庫
import os
# 建立openai客戶端例項
OpenAI_client = OpenAI(api_key=deepseek_api_key, base_url=
'https://api.deepseek.com'
)
generate_response
。model
是我們使用的大模型,這裡是 deepseek-chat
。temperature
決定大模型回答的隨機性,數值在0-2之間,數值越高,生成的文字越隨機;值越低,生成的文字越確定。# 定義生成響應的函式
def generate_response(
system_prompt,
user_prompt,
model,
temperature
):
# 大模型的響應
response = OpenAI_client.chat.completions.create(
model=model,
messages=[
# 設定系統資訊,通常用於設定模型的行為、角色或上下文。
{
'role'
:
'system'
,
'content'
: system_prompt},
# 設定使用者訊息,使用者訊息是使用者傳送給模型的訊息。
{
'role'
:
'user'
,
'content'
: user_prompt},
],
# 設定溫度
temperature=temperature,
stream=True
)
# 遍歷響應中的每個塊
for
chunk
in
response:
# 檢查塊中是否包含選擇項
if
chunk.choices:
# 列印選擇項中的第一個選項的增量內容,並確保立即重新整理輸出
print
(chunk.choices[0].delta.content, end=
''
, flush=True)
system_prompt
是系統提示詞,主要用於設定模型的行為、角色或上下文。你可以理解為這是系統給大模型的提示詞,而且始終有效。我們可以使用下面的提示詞規範大模型的響應:
system_prompt =
'你是魯迅作品研究者,熟悉魯迅的各種作品。'
user_prompt
是使用者提示詞,是使用者發給大模型的。大模型會在系統提示詞和使用者提示詞的共同作用下,生成響應。使用者提示詞由查詢句子 query 和向量資料庫搜尋到的句子組成。對於 fixed_chunk
和 semantic_chunk
,我們需要獲取 chunk
欄位的值。對於 sentence_window
,我們需要獲取 window
欄位的值。定義下面的函式可以幫助我們方便獲取想要的值。
def get_ref_info (query, search_params,
limit
, output_fields, method):
res = vector_search (query, search_params,
limit
, output_fields, method)
for
hit
in
res[0]:
ref_info = {
'ref'
: hit[
'entity'
][
'window'
]
if
method ==
'sentence_window'else
hit[
'entity'
][
'chunk'
],
'title'
: hit[
'entity'
][
'title'
]
}
return
ref_info
for
method
in
chunk_methods:
print
(f
'分塊方法: {method}'
)
# 獲取參考資訊
ref_info = get_ref_info(query, search_params,
limit
, output_fields, method)
# 生成使用者提示詞
user_prompt = (
f
'請你根據提供的參考資訊,查詢是否有與問題語義相似的內容。參考資訊:{ref_info}。問題:{query}。\n'
f
'如果找到了相似的內容,請回復“魯迅的確說過類似的話,原文是[原文內容],這句話來自[文章標題]”。\n'
f
'[原文內容]是參考資訊中ref欄位的值,[文章標題]是參考資訊中title欄位的值。如果引用它們,請引用完整的內容。\n'
f
'如果參考資訊沒有提供和問題相關的內容,請回答“據我所知,魯迅並沒有說過類似的話。”'
)
# 生成響應
generate_response(system_prompt, user_prompt, model, temperature)
print
(
'\n'
+
'*'
* 50 +
'\n'
)
分塊方法: fixed_chunk
魯迅的確說過類似的話,原文是“的人多了,也便成了路。 一九二一年一月。”,這句話來自《故鄉》。
**************************************************
分塊方法: semantic_chunk
魯迅的確說過類似的話,原文是“跨過了滅亡的人們向前進。什麼是路?就是從沒路的地方踐踏出來的,從只有荊棘的地方開闢出來的。以前早有路了,以後也該永遠有路。人類總不會寂寞,因為生命是進步的,是樂天的。昨天,我對我的朋友L說,“一個人死了,在死者自身和他的眷屬是悲慘的事,”這句話來自《六十六生命的路》。
**************************************************
分塊方法: sentence_window
魯迅的確說過類似的話,原文是“我想:希望本是無所謂有,無所謂無的。 這正如地上的路;其實地上本沒有路,走的人多了,也便成了路。 一九二一年一月。”,這句話來自《故鄉》。
fixed_chunk
選手雖然給出了原文,但是遺憾的是不夠完整。semantic_chunk
選手沒有搜尋到原文,可能是因為這個塊的前半部分和查詢句子的語義相差較遠,這也反應了分塊對搜尋結果的影響,但是給出的句子語義也和原文類似,算是意外收穫。而 sentence_window
選手,則給出了標準答案。sentence_window
的高效背後是有代價的。你可以比較下這三種分塊方法向量化後的檔案,luxun_sample_fixed_chunk_vector.json
的大小是11.5MPa,luxun_sample_semantic_chunk_vector.json
增加到了16.1MPa,而 luxun_sample_sentence_window_vector.json
則達到了49.2MPa,是前兩者的3到4倍。
分塊方法: fixed_chunk
魯迅的確說過類似的話,原文是“在我的後園,可以看見牆外有兩株樹,一株是棗樹,還有一株也是棗樹。這上面的夜的天空,奇怪而高,我生平沒有見過這樣的奇怪而高的天空。他彷彿要離開人間而去,使人們仰面不再看見。然而現在卻非常之藍,閃閃地䀹著幾十個星星的眼,冷眼。他的口角上現出微笑,似乎自以為大有深意,而將繁霜灑在我的園裡的野花草上。我不知”,這句話來自《秋夜》。
**************************************************
分塊方法: semantic_chunk
魯迅的確說過類似的話,原文是“在我的後園,可以看見牆外有兩株樹,一株是棗樹,還有一株也是棗樹。這上面的夜的天空,奇怪而高,我生平沒有見過這樣的奇怪而高的天空。他彷彿要離開人間而去,使人們仰面不再看見。然而現在卻非常之藍,閃閃地䀹著幾十個星星的眼,冷眼。他的口角上現出微笑,”,這句話來自《秋夜》。
**************************************************
分塊方法: sentence_window
魯迅的確說過類似的話,原文是“在我的後園,可以看見牆外有兩株樹,一株是棗樹,還有一株也是棗樹。 這上面的夜的天空,奇怪而高,我生平沒有見過這樣的奇怪而高的天空。”,這句話來自《秋夜》。
**************************************************
sentence_window
選手給出的原文更精準。
分塊方法: fixed_chunk
魯迅的確說過類似的話,原文是“獸是單獨的,牛羊則結隊;野牛的大隊,就會排角成城以御強敵了,但拉開一匹,定只能牟牟地叫。”,這句話來自《春末閒談》。
**************************************************
分塊方法: semantic_chunk
據我所知,魯迅並沒有說過類似的話。
**************************************************
分塊方法: sentence_window
魯迅的確說過類似的話,原文是“猛獸是單獨的,牛羊則結隊;野牛的大隊,就會排角成城以御強敵了,但拉開一匹,定只能牟牟地叫。”,這句話來自《春末閒談》。
**************************************************
fixed_chunk
選手雖然搜尋結果包含了無關內容,但是大模型從中篩選出了合適的句子。semantic_chunk
選手搜尋到的句子並沒有被大模型採納。sentence_window
選手仍然不負眾望,給出了標準答案,還能提供“視窗句子”作為上下文,在 RAG 應用中,把上下文句子一起傳遞給大模型,能讓大模型更好地理解句子的語義,作出更好的回答。請為 sentence_window
選手的精彩表現鼓掌。vector_search
函式的 limit
引數,讓向量資料庫多返回幾個句子,增加命中機率。或者增加 generate_response
函式的 temperature
引數,看看 RAG 的響應如何變化。還有提示詞,它直接影響大模型如何回答。-
ChunkViz(https://chunkviz.up.railway.app/)是一個線上網站,提供分塊視覺化功能。
-
想了解 RAG 更多有趣應用,可以看看這個影片:當我開發出史料檢索RAG應用,正史怪又該如何應對?(https://www.bilibili.com/video/BV1da4y1k78p/?spm_id_from=333.337.search-card.all.click&vd_source=ad92e3138da83a643ab3f5883c7664c7)。想了解更多技術細節,看這裡:揭秘「 B 站最火的 RAG 應用」是如何煉成的。
-
想了解更多分塊技術,可以閱讀檢索增強生成(RAG)的分塊策略指南(https://zilliz.com.cn/blog/guide-to-chunking-sreategies-for-rag) 和從固定大小到NLP分塊 – 文字分塊技術的深入研究(https://safjan.com/from-fixed-size-to-nlp-chunking-a-deep-dive-into-text-chunking-techniques/)兩篇文章。