注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
transforms-aip
库简化了语言模型API在PySpark工作流中的集成。该库使用户能够使用Spark DataFrame中的数据创建对完成和嵌入模型的请求。transforms-aip
库的功能包括:
这些功能的使用只需用户进行最小的配置。
transforms-aip
库要开始使用transforms-aip
库,您需要按以下顺序在您的变换库中安装以下依赖项:
palantir_models>=0.933.0
transforms-aip>=0.441.0
本节包含一个完成示例、嵌入示例、视觉示例以及展示如何计算词元数量的示例。
对于任何使用transforms-aip
库的情况,我们强烈建议使用KUBERNETES_NO_EXECUTORS
配置文件以获得最佳速度和成本效率。请查看我们的文档以获取有关配置文件配置的更多信息。
对于我们希望创建完成的给定数据集,我们只需将文本列提供给库即可。这可以在下面的示例中看到,使用文本列question
:
id | question |
---|---|
1 | What is the capital of Canada? |
2 | Which country has the largest population? |
3 | Name the longest river in South America. |
4 | How many states are there in the United States? |
5 | What is the name of the largest ocean on Earth? |
以下是运行完成的完整代码片段;请注意在添加列步骤中的注释:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 # 每分钟请求次数上限 TOKEN_LIMIT_PER_MIN = 50000 # 每分钟令牌数上限 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 必须指定使用的模型并将其导入存储库 chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # 基本提示语 # 抽样500个问题 sample_questions = questions.dataframe().limit(500) # 构建提示语 # 添加question_prompt列以传递给编排器 questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # 创建编排器 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # 可以传递与OpenAI兼容的属性 model_properties=CompletionModelProperties(temperature=0.6), ) # 创建llm_answer列以存储响应 # 创建_completion_error-llm_answer列以记录响应中遇到的任何问题 answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # 保存结果 output.write_dataframe(answered)
这将创建一个带有您提供的名称的答案列(llm_answer
)和一个用于指示是否有请求失败的出错列(_completion_error-llm_answer
)。出错列将始终采用(_completion_error-<result_column_name>
)的形式。
例如,这个变换的输出将是:
id | question | question_prompt | llm_answer | _completion_error-llm_answer |
---|---|---|---|---|
1 | 加拿大的首都是什么? | 回答这个问题:加拿大的首都是什么? | Ottawa | null |
2 | 哪个国家人口最多? | 回答这个问题:哪个国家人口最多? | China | null |
3 | 说出南美洲最长的河流的名字。 | 回答这个问题:说出南美洲最长的河流的名字。 | Amazon River | null |
4 | 美国有多少个州? | 回答这个问题:美国有多少个州? | 50 | null |
5 | 地球上最大的大洋叫什么名字? | 回答这个问题:地球上最大的大洋叫什么名字? | Pacific Ocean | null |
除了简单的单列提示外,您还可以使用更复杂的提示策略,例如传递多列的提示、字符串和列的组合或图像。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from transforms.aip.prompt import ( StringPromptComponent, ImagePromptComponent, MultimediaPromptComponent, ChatMessageRole, ) # 创建一个提示,包含两列:一个系统提示和一个用户提示 prompt = [ StringPromptComponent(col="system_prompt_col", user=ChatMessageRole.SYSTEM), # 系统提示列 StringPromptComponent(col="prompt_col"), # 用户提示列 ] # 创建一个提示,使用字符串字面量作为系统提示,以及一列作为用户提示 string_literal_prompt = [ StringPromptComponent(text="prompt string literal", user=ChatMessageRole.SYSTEM), # 系统提示为字符串字面量 StringPromptComponent(col="prompt_col"), # 用户提示列 ] # 创建一个多媒体提示,包含图像定义用于视觉查询 multimedia_prompt = [ MultimediaPromptComponent(["column_name"], ChatMessageRole.SYSTEM), # 系统提示为图像定义 MultimediaPromptComponent( [ "column_name2", ImagePromptComponent(mediasetInput, "mediaItemRid_column"), # 图像提示组件 ] ), ]
协调器还可以处理包括从媒体集加载的图像的提示,包括与图像大小相关的基于词元的速率限制。
下面是运行视觉模型的完整代码片段:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.mediasets import MediaSetInput from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from transforms.aip.prompt import MultimediaPromptComponent, ImagePromptComponent from palantir_models.transforms import OpenAiGptChatWithVisionLanguageModelInput from language_model_service_api.languagemodelservice_api import ( ChatMessageRole, ) RATE_LIMIT_PER_MIN = 60 # 每分钟的请求限制 TOKEN_LIMIT_PER_MIN = 50000 # 每分钟的令牌限制 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), pngs=MediaSetInput("input_media_set"), model=OpenAiGptChatWithVisionLanguageModelInput( "ri.language-model-service..language-model.gpt-4-vision_azure" ), ) def compute(ctx, output, pngs, model): # 1. 获取指定媒体集的媒体项RID df = pngs.list_media_items_by_path(ctx) # 2. 向DataFrame中添加系统提示列 df = df.withColumn( "system_prompt", F.lit("Briefly describe the following image:") # 简要描述以下图片: ) # 3. 定义完成任务的协调器 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, model, model_properties=CompletionModelProperties(max_tokens=4096), # 最大令牌数为4096 ) # 4. 使用提示调用模型 answered = completions.withColumn( ctx, df, [ MultimediaPromptComponent(["system_prompt"], ChatMessageRole.SYSTEM), # 系统角色的多媒体提示组件 MultimediaPromptComponent([ImagePromptComponent(pngs, "mediaItemRid")]), # 图像提示组件 ], "llm_answer", # 模型回答的列名 ) # 5. 保存结果 output.write_dataframe(answered) # 将结果写入输出数据集
这将创建一个名为您提供的名称(llm_answer
)的答案列和一个错误列(_completion_error-llm_answer
),以指示是否有任何请求失败。错误列总是采用_completion_error-<result_column_name>
的形式。
对于嵌入,使用编排器的代码结构与完成编排器类似。如下所示:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 必须导入嵌入模型以供使用 embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. 获取500个问题 sample_questions = questions.dataframe().limit(500) # 2. 实例化协调器 embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. 运行嵌入 # 在 embedding_result 列中创建嵌入 # 在 _embeddings_error-embedding_result 列中记录任何问题 questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. 保存结果 output.write_dataframe(questions_with_embeddings)
与完成编排器相似,嵌入编排器创建一个嵌入响应列(embedding_result
)和错误列(_embeddings_error-embedding_result
),格式为(_embeddings_error-<result_column_name>
):
id | question | embedding_result | _embeddings_error-embedding_result |
---|---|---|---|
1 | 加拿大的首都是什么? | [0.12, 0.54, ...] | null |
2 | 哪个国家人口最多? | [-0.23, 0.87, ...] | null |
3 | 说出南美洲最长的河流的名字。 | [0.65, -0.48, ...] | null |
4 | 美国有多少个州? | [0.33, 0.92, ...] | null |
5 | 地球上最大的海洋叫什么名字? | [-0.11, 0.34, ...] | null |
该库还提供了一个易于使用的模块,用于理解输入的词元数量,如下所示:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
from palantir_models.transforms import GenericEmbeddingModelInput from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType from transforms.aip.tokenizer import Tokenizer from transforms.api import Input, Output, configure, transform @configure(["KUBERNETES_NO_EXECUTORS_SMALL"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model): # 1. 提取500个问题 sample_questions = questions.dataframe().limit(500) # 2. 获取适用的分词器 ada_tokenizer = Tokenizer.get_tokenizer(embedding_model) # 3. 创建一个UDF用于对行进行分词 @udf(returnType=IntegerType()) def calc_tokens(input_str: str) -> int: return ada_tokenizer.estimate_token_count(input_str) # 4. 计算每一行的分词数 with_tokens = sample_questions.withColumn( "num_tokens", calc_tokens(col("question")) ) # 5. 保存结果 output.write_dataframe(with_tokens)
在这段代码中,我们使用了一个基于Spark的转换过程,它从输入数据集中提取500个问题,然后使用指定的嵌入模型获取一个分词器,通过自定义的UDF计算每个问题的分词数,最后将结果保存到输出数据集中。
此代码创建一个新列(num_tokens
),其中包含question
列中字符串的词元数量。此数量基于为该模型注册的编码计算(如果存在编码),该编码将单词片段映射到语言模型识别的词元。
对于所有平台支持的OpenAI模型,已配置了词元化器。对于其他模型,或者如果找不到词元化器,系统将默认为启发式方法(词数除以四)。这可能会不准确。
有关运行的一般信息,请将参数verbose=True
传递给协调器的构造函数,如以下示例所示:
Copied!1 2 3 4 5 6
completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, # 每分钟请求频率限制 TOKEN_LIMIT_PER_MIN, # 每分钟令牌限制 chat_model, # 使用的聊天模型 verbose=True, # 输出详细信息 )
这将向结果添加元数据列,以帮助理解运行。这些列包括请求运行的分区、请求的时间戳和使用的词元。