注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

AIP 调度器

transforms-aip库简化了语言模型API在PySpark工作流中的集成。该库使用户能够使用Spark DataFrame中的数据创建对完成和嵌入模型的请求。transforms-aip库的功能包括:

  • 速率限制管理: 处理请求和词元的速率限制。
  • 分布式工作负载: 在Spark任务的执行器中均匀分配任务,最大化Spark的优势。
  • 性能优化: 确保请求的最大速度在当前速率限制允许的范围内。
  • 错误处理: 如果请求因任何错误而失败,系统将捕获有关失败的信息并继续执行,以避免整个搭建失败。

这些功能的使用只需用户进行最小的配置。

安装transforms-aip

要开始使用transforms-aip库,您需要按以下顺序在您的变换库中安装以下依赖项:

  1. palantir_models>=0.933.0
  2. transforms-aip>=0.441.0

用法

本节包含一个完成示例嵌入示例视觉示例以及展示如何计算词元数量的示例。

对于任何使用transforms-aip库的情况,我们强烈建议使用KUBERNETES_NO_EXECUTORS配置文件以获得最佳速度和成本效率。请查看我们的文档以获取有关配置文件配置的更多信息。

完成示例

对于我们希望创建完成的给定数据集,我们只需将文本列提供给库即可。这可以在下面的示例中看到,使用文本列question

idquestion
1What is the capital of Canada?
2Which country has the largest population?
3Name the longest river in South America.
4How many states are there in the United States?
5What is the name of the largest ocean on Earth?

以下是运行完成的完整代码片段;请注意在添加列步骤中的注释:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 # 每分钟请求次数上限 TOKEN_LIMIT_PER_MIN = 50000 # 每分钟令牌数上限 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 必须指定使用的模型并将其导入存储库 chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # 基本提示语 # 抽样500个问题 sample_questions = questions.dataframe().limit(500) # 构建提示语 # 添加question_prompt列以传递给编排器 questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # 创建编排器 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # 可以传递与OpenAI兼容的属性 model_properties=CompletionModelProperties(temperature=0.6), ) # 创建llm_answer列以存储响应 # 创建_completion_error-llm_answer列以记录响应中遇到的任何问题 answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # 保存结果 output.write_dataframe(answered)

这将创建一个带有您提供的名称的答案列(llm_answer)和一个用于指示是否有请求失败的出错列(_completion_error-llm_answer)。出错列将始终采用(_completion_error-<result_column_name>)的形式。

例如,这个变换的输出将是:

idquestionquestion_promptllm_answer_completion_error-llm_answer
1加拿大的首都是什么?回答这个问题:加拿大的首都是什么?Ottawanull
2哪个国家人口最多?回答这个问题:哪个国家人口最多?Chinanull
3说出南美洲最长的河流的名字。回答这个问题:说出南美洲最长的河流的名字。Amazon Rivernull
4美国有多少个州?回答这个问题:美国有多少个州?50null
5地球上最大的大洋叫什么名字?回答这个问题:地球上最大的大洋叫什么名字?Pacific Oceannull

额外的提示策略

除了简单的单列提示外,您还可以使用更复杂的提示策略,例如传递多列的提示、字符串和列的组合或图像。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from transforms.aip.prompt import ( StringPromptComponent, ImagePromptComponent, MultimediaPromptComponent, ChatMessageRole, ) # 创建一个提示,包含两列:一个系统提示和一个用户提示 prompt = [ StringPromptComponent(col="system_prompt_col", user=ChatMessageRole.SYSTEM), # 系统提示列 StringPromptComponent(col="prompt_col"), # 用户提示列 ] # 创建一个提示,使用字符串字面量作为系统提示,以及一列作为用户提示 string_literal_prompt = [ StringPromptComponent(text="prompt string literal", user=ChatMessageRole.SYSTEM), # 系统提示为字符串字面量 StringPromptComponent(col="prompt_col"), # 用户提示列 ] # 创建一个多媒体提示,包含图像定义用于视觉查询 multimedia_prompt = [ MultimediaPromptComponent(["column_name"], ChatMessageRole.SYSTEM), # 系统提示为图像定义 MultimediaPromptComponent( [ "column_name2", ImagePromptComponent(mediasetInput, "mediaItemRid_column"), # 图像提示组件 ] ), ]

视觉完成示例

协调器还可以处理包括从媒体集加载的图像的提示,包括与图像大小相关的基于词元的速率限制。

下面是运行视觉模型的完整代码片段:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.mediasets import MediaSetInput from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from transforms.aip.prompt import MultimediaPromptComponent, ImagePromptComponent from palantir_models.transforms import OpenAiGptChatWithVisionLanguageModelInput from language_model_service_api.languagemodelservice_api import ( ChatMessageRole, ) RATE_LIMIT_PER_MIN = 60 # 每分钟的请求限制 TOKEN_LIMIT_PER_MIN = 50000 # 每分钟的令牌限制 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), pngs=MediaSetInput("input_media_set"), model=OpenAiGptChatWithVisionLanguageModelInput( "ri.language-model-service..language-model.gpt-4-vision_azure" ), ) def compute(ctx, output, pngs, model): # 1. 获取指定媒体集的媒体项RID df = pngs.list_media_items_by_path(ctx) # 2. 向DataFrame中添加系统提示列 df = df.withColumn( "system_prompt", F.lit("Briefly describe the following image:") # 简要描述以下图片: ) # 3. 定义完成任务的协调器 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, model, model_properties=CompletionModelProperties(max_tokens=4096), # 最大令牌数为4096 ) # 4. 使用提示调用模型 answered = completions.withColumn( ctx, df, [ MultimediaPromptComponent(["system_prompt"], ChatMessageRole.SYSTEM), # 系统角色的多媒体提示组件 MultimediaPromptComponent([ImagePromptComponent(pngs, "mediaItemRid")]), # 图像提示组件 ], "llm_answer", # 模型回答的列名 ) # 5. 保存结果 output.write_dataframe(answered) # 将结果写入输出数据集

这将创建一个名为您提供的名称(llm_answer)的答案列和一个错误列(_completion_error-llm_answer),以指示是否有任何请求失败。错误列总是采用_completion_error-<result_column_name>的形式。

嵌入示例

对于嵌入,使用编排器的代码结构与完成编排器类似。如下所示:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 必须导入嵌入模型以供使用 embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. 获取500个问题 sample_questions = questions.dataframe().limit(500) # 2. 实例化协调器 embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. 运行嵌入 # 在 embedding_result 列中创建嵌入 # 在 _embeddings_error-embedding_result 列中记录任何问题 questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. 保存结果 output.write_dataframe(questions_with_embeddings)

与完成编排器相似,嵌入编排器创建一个嵌入响应列(embedding_result)和错误列(_embeddings_error-embedding_result),格式为(_embeddings_error-<result_column_name>):

idquestionembedding_result_embeddings_error-embedding_result
1加拿大的首都是什么?[0.12, 0.54, ...]null
2哪个国家人口最多?[-0.23, 0.87, ...]null
3说出南美洲最长的河流的名字。[0.65, -0.48, ...]null
4美国有多少个州?[0.33, 0.92, ...]null
5地球上最大的海洋叫什么名字?[-0.11, 0.34, ...]null

计算词元

该库还提供了一个易于使用的模块,用于理解输入的词元数量,如下所示:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from palantir_models.transforms import GenericEmbeddingModelInput from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType from transforms.aip.tokenizer import Tokenizer from transforms.api import Input, Output, configure, transform @configure(["KUBERNETES_NO_EXECUTORS_SMALL"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model): # 1. 提取500个问题 sample_questions = questions.dataframe().limit(500) # 2. 获取适用的分词器 ada_tokenizer = Tokenizer.get_tokenizer(embedding_model) # 3. 创建一个UDF用于对行进行分词 @udf(returnType=IntegerType()) def calc_tokens(input_str: str) -> int: return ada_tokenizer.estimate_token_count(input_str) # 4. 计算每一行的分词数 with_tokens = sample_questions.withColumn( "num_tokens", calc_tokens(col("question")) ) # 5. 保存结果 output.write_dataframe(with_tokens)

在这段代码中,我们使用了一个基于Spark的转换过程,它从输入数据集中提取500个问题,然后使用指定的嵌入模型获取一个分词器,通过自定义的UDF计算每个问题的分词数,最后将结果保存到输出数据集中。 此代码创建一个新列(num_tokens),其中包含question列中字符串的词元数量。此数量基于为该模型注册的编码计算(如果存在编码),该编码将单词片段映射到语言模型识别的词元。

对于所有平台支持的OpenAI模型,已配置了词元化器。对于其他模型,或者如果找不到词元化器,系统将默认为启发式方法(词数除以四)。这可能会不准确。

调试

有关运行的一般信息,请将参数verbose=True传递给协调器的构造函数,如以下示例所示:

Copied!
1 2 3 4 5 6 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, # 每分钟请求频率限制 TOKEN_LIMIT_PER_MIN, # 每分钟令牌限制 chat_model, # 使用的聊天模型 verbose=True, # 输出详细信息 )

这将向结果添加元数据列,以帮助理解运行。这些列包括请求运行的分区、请求的时间戳和使用的词元。