阿里云大数据AI技术

阿里云大数据AI技术

背景介绍

阿里云向量检索 Milvus 版是一款 Serverless 全托管服务,确保了与开源 Milvus 的完全兼容性,并支持无缝迁移。它在开源版本的基础上增强了可扩展性,能提供大规模 AI 向量数据的相似性检索服务。凭借其开箱即用的特性、灵活的扩展能力和全链路监控告警,Milvus 云服务成为多样化 AI 应用场景的理想选择,包括多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等。您还可以利用开源的 Attu 工具进行可视化操作,进一步促进应用的快速开发和部署。

阿里云向量检索 Milvus 版已开启免费公测。您可以在E-MapReduce控制台,选择 EMR Serverless > Milvus,进入 Milvus 页面创建入门版的实例,公测期间您可以免费试用 Milvus 服务。

前提条件

使用限制

请确保您的运行环境中已安装 Python 3.8或以上版本,以便顺利安装并使用 DashScope。

操作流程

▶ 准备工作

  1. 安装相关的依赖库。

pip3 install pymilvus tqdm dashscope

2. 下载所需的知识库。
本文示例使用了公开数据集CEC-Corpus。CEC-Corpus 数据集包含332篇针对各类突发事件的新闻报道,语料和标注数据,这里我们只需要提取原始的新闻稿文本,并将其向量化后入库。

git clone https://github.com/shijiebei2009/CEC-Corpus.git

▶ 步骤一:知识库向量化

1. 创建 embedding.py 文件,内容如下所示。

import os
import time
from tqdm import tqdm
import dashscope
from dashscope import TextEmbedding
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility


def prepareData(path, batch_size=25):
    batch_docs = []
    for file in os.listdir(path):
        with open(path + '/' + file, 'r', encoding='utf-8') as f:
            batch_docs.append(f.read())
            if len(batch_docs) == batch_size:
                yield batch_docs
                batch_docs = []

    if batch_docs:
        yield batch_docs


def getEmbedding(news):
    model = TextEmbedding.call(
        model=TextEmbedding.Models.text_embedding_v1,
        input=news
    )
    embeddings = [record['embedding'] for record in model.output['embeddings']]
    return embeddings if isinstance(news, list) else embeddings[0]


if __name__ == '__main__':

    current_path = os.path.abspath(os.path.dirname(__file__))   # 当前目录
    root_path = os.path.abspath(os.path.join(current_path, '..'))   # 上级目录
    data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText'  # 数据下载git clone https://github.com/shijiebei2009/CEC-Corpus.git

    # 配置Dashscope API KEY
    dashscope.api_key = '<YOUR_DASHSCOPE_API_KEY>'

    # 配置Milvus参数
    COLLECTION_NAME = 'CEC_Corpus'
    DIMENSION = 1536
    MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com'
    MILVUS_PORT = '19530'
    USER = 'root'
    PASSWORD = '<password>'

    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)

    # Remove collection if it already exists
    if utility.has_collection(COLLECTION_NAME):
        utility.drop_collection(COLLECTION_NAME)

    # Create collection which includes the id, title, and embedding.
    fields = [
        FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False),
        FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096),
        FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION)
    ]
    schema = CollectionSchema(fields=fields, description='CEC Corpus Collection')
    collection = Collection(name=COLLECTION_NAME, schema=schema)

    # Create an index for the collection.
    index_params = {
        'index_type': 'IVF_FLAT',
        'metric_type': 'L2',
        'params': {'nlist': 1024}
    }
    collection.create_index(field_name="embedding", index_params=index_params)

    id = 0
    for news in tqdm(list(prepareData(data_path))):
        ids = [id + i for i, _ in enumerate(news)]
        id += len(news)

        vectors = getEmbedding(news)
        # insert Milvus Collection
        for id, vector, doc in zip(ids, vectors, news):
            insert_doc = (doc[:498] + '..') if len(doc) > 500 else doc
            ins = [[id], [insert_doc], [vector]]  # Insert the title id, the text, and the text embedding vector
            collection.insert(ins)
            time.sleep(2)

本文示例涉及以下参数,请您根据实际环境替换。

  1. 在 Attu 中您可以看到创建的 Collection,具体操作请参见Attu操作指南

通过阿里云向量检索 Milvus 版和通义千问快速构建基于专属知识库的问答系统-LMLPHP

在本文示例中,我们将 Embedding 向量和新闻报道文稿一起存入 Milvus 中,同时构建索引类型采用了 IVF_FLAT,在向量检索时,同时可以召回原始文稿。

▶ 步骤二:向量检索与知识问答

数据写入完成后,即可进行快速的向量检索。在通过提问搜索到相关的知识点后,我们可以按照特定的模板将“提问 + 知识点”作为 prompt 向 LLM 发起提问。在这里我们所使用的 LLM 是通义千问,这是阿里巴巴自主研发的超大规模语言模型,能够在用户自然语言输入的基础上,通过自然语言理解和语义分析,理解用户意图。通过提供尽可能清晰详细的指令(prompt),可以获得更符合预期的结果。这些能力都可以通过通义千问来获得。

本文示例设计的提问模板格式为:请基于我提供的内容回答问题。内容是{___},我的问题是{___},当然您也可以自行设计合适的模板。

创建 answer.py 文件,内容如下所示。

import os
import dashscope
from dashscope import Generation
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
from embedding import getEmbedding


def getAnswer(query, context):
    prompt = f'''请基于```内的报道内容,回答我的问题。
	      ```
	      {context}
	      ```
	      我的问题是:{query}。
       '''

    rsp = Generation.call(model='qwen-turbo', prompt=prompt)
    return rsp.output.text


def search(text):
    # Search parameters for the index
    search_params = {
        "metric_type": "L2"
    }

    results = collection.search(
        data=[getEmbedding(text)],  # Embeded search value
        anns_field="embedding",  # Search across embeddings
        param=search_params,
        limit=1,  # Limit to five results per search
        output_fields=['text']  # Include title field in result
    )

    ret = []
    for hit in results[0]:
        ret.append(hit.entity.get('text'))
    return ret


if __name__ == '__main__':

    current_path = os.path.abspath(os.path.dirname(__file__))   # 当前目录
    root_path = os.path.abspath(os.path.join(current_path, '..'))   # 上级目录
    data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText'

    # 配置Dashscope API KEY
    dashscope.api_key = '<YOUR_DASHSCOPE_API_KEY>'

    # 配置Milvus参数
    COLLECTION_NAME = 'CEC_Corpus'
    DIMENSION = 1536
    MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com'
    MILVUS_PORT = '19530'
    USER = 'root'
    PASSWORD = '<password>'

    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)

    fields = [
        FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False),
        FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096),
        FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION)
    ]
    schema = CollectionSchema(fields=fields, description='CEC Corpus Collection')
    collection = Collection(name=COLLECTION_NAME, schema=schema)

    # Load the collection into memory for searching
    collection.load()

    question = '北京中央电视台工地发生大火,发生在哪里?出动了多少辆消防车?人员伤亡情况如何?'
    context = search(question)
    answer = getAnswer(question, context)
    print(answer)

运行完成后,针对北京中央电视台工地发生大火,发生在哪里?出动了多少辆消防车?人员伤亡情况如何?的提问,会得到以下结果。

火灾发生在北京市朝阳区东三环中央电视台新址园区在建的附属文化中心大楼工地。出动了54辆消防车。目前尚无人员伤亡报告。


如有疑问,可加入向量检索 Milvus 版用户钉群 59530004993咨询。

04-19 16:42