代码示例Incremental transforms变换

注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

变换

Python

使用PySpark从API增量获取和更新数据

如何使用外部变换从API获取数据并增量更新?

此代码使用PySpark和requests库在指定的日期范围内从API获取数据并增量更新输出。如果API支持分页,它还支持分页。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from pyspark.sql import functions as F from transforms.api import incremental, transform, Output import requests from transforms.external.systems import EgressPolicy, use_external_systems, Credential import logging from datetime import datetime as dt import json # 定义一个函数用于获取数据,使用给定的token、开始日期和结束日期 def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'): headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} data = { "from": start_date, "to": end_date, } response = requests.post(next_link_url, json=data, headers=headers) logging.warn(response.json()) # 记录警告信息 data = response.json()["data"] # 提取响应中的数据部分 return json.dumps(data) # 返回JSON格式的数据 @use_external_systems( creds=Credential(), # 使用外部系统的凭证 egress=EgressPolicy(), # 指定数据出口策略 ) @incremental() # 使用增量更新模式 @transform( output=Output(), # 定义输出 ) def compute(output, creds, egress, ctx): token = creds.get("token") # 获取凭证中的token if ctx.is_incremental: # 检查是否为增量更新模式 previous = output.dataframe('current').localCheckpoint() # 获取当前数据的本地检查点 if NEXT_LINK_COLUMN in previous.columns: # 检查是否存在下一链接列 latest_row = ( previous .where(F.col(LAST_MODIFIED_COLUMN).isNotNull()) .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()]) .limit(1).collect()[0] ) next_link_url = latest_row[NEXT_LINK_COLUMN] # 获取下一链接的URL last_date = latest_row[LAST_MODIFIED_COLUMN] # 获取最后修改日期 else: last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN] today = dt.today().strftime("%Y-%m-%d") # 获取今天的日期 data = _get_data(token, last_date, today, next_link_url) # 获取数据 df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}]) # 创建DataFrame output.set_mode("modify") # 设置输出模式为修改 output.write_dataframe(df) # 写入数据框到输出
  • 提交日期: 2024-04-26
  • 标签: API, pyspark, 增量, 数据框, 外部变换

增量助手

如何更改增量变换的输入,取决于它是否以增量方式运行?

Copied!
1 2 3 4 5 6 7 8 9 10 11 @incremental() @transform( x=Output(), # x为输出 y=Input(), # y为输入 z=Input() # z为输入 ) def compute(ctx, x, y, z): if ctx.is_incremental: ## 一些增量更新的代码 else: ## 其他代码

上述代码定义了一个带有装饰器的Python函数compute。它接受一个上下文对象ctx,以及三个参数xyz。装饰器@incremental()@transform()用于指定函数的行为,@transform()装饰器说明了哪些参数是输入(yz)以及哪个参数是输出(x)。函数内部通过ctx.is_incremental来判断是否进行增量更新,以执行不同的代码逻辑。

  • 提交日期: 2024-03-20
  • 标签: 代码创作, 代码库, python, 增量

增量求和聚合

如何通过增量装饰器优化非增量管道,以对非常大的数据集进行增量聚合?

此代码使用 PySpark 通过基于特定字段对数据集进行分组并计算另一个字段的不同值来计算数据集的每日聚合。然后,代码将结果存储在输出数据帧中,处理增量和非增量情况。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from pyspark.sql import functions as F from transforms.api import transform, incremental, Input, Output from pyspark.sql import DataFrame @incremental(semantic_version=1) @transform( # 输入数据集 input_data=Input(""), daily_aggregate=Output("") ) def compute(ctx, input_data, daily_aggregate): input_df = input_data.dataframe() latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique")) # 需要一个schema来加载之前的输出 latest_daily_agg_schema = latest_daily_agg.schema if ctx.is_incremental: # 获取之前的聚合数据 last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema) # 将最新聚合数据和之前的聚合数据合并,并按字段分组求和 sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique")) # 替换模式下写入聚合结果 daily_aggregate.set_mode('replace') daily_aggregate.write_dataframe(sum_daily) else: # 非增量模式下,仅存储最新的每日聚合结果 daily_aggregate.write_dataframe(latest_daily_agg)

代码说明

  • @incremental(semantic_version=1):标记此函数为增量计算,semantic_version指定版本。
  • @transform(...):定义输入和输出数据集。
  • input_data.dataframe():获取输入数据集的DataFrame。
  • groupByagg:对数据进行分组并聚合。
  • is_incremental:判断是否为增量模式。
  • unionByName:合并两个DataFrame。
  • write_dataframe:写入结果到输出数据集。
  • 提交日期: 2024-03-20
  • 标签: code authoring, code repositories, python, incremental, aggregation

PySpark中的增量变换

如何在Palantir Foundry中使用PySpark实现增量变换?

此代码演示了如何在PySpark变换中使用增量装饰器来处理增量处理。它在输入数据框中添加了一个processed_at时间戳列和一个is_running_as_incremental列。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import transform_df, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F # 应用增量装饰器 @incremental() @transform_df( Output(""), input_df=Input("") ) def example_transform_incremental_processing(ctx, input_df): # 转换的行为取决于输入/输出的状态 # (有关详细信息,请参见原代码片段中的情况1到3) # # 与快照处理相比,下面的逻辑没有任何变化。 # 行为由Foundry处理。 # 示例处理 input_df = input_df.withColumn('processed_at', F.current_timestamp()) # 使用 ctx.is_incremental 来判断当前构建是以“快照”方式还是“增量”方式运行 input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental)) # 返回编辑后的数据帧 return input_df
  • 提交日期: 2024-03-20
  • 标签: 代码创作, 代码库, python, 增量

快照输入至增量输出

如何在完全重写的快照输入上执行增量变换,并且仅处理我之前未处理的新行?

此代码使用 PySpark 将输入数据集读取为快照,并与先前输出进行比较以找到新行。然后为新行添加时间戳并将其附加到输出数据集中,执行增量变换。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from transforms.api import incremental, transform, Input, Output from pyspark.sql import types as T, functions as F # 使用 @incremental 装饰器以获取高级读/写模式。 @incremental( snapshot_inputs=["input_dataset"] # 指定快照输入数据集 ) # 使用 @transform 装饰器以更好地控制输入和输出。 @transform( output_dataset=Output("incremental_output"), # 定义输出数据集 input_dataset=Input("snapshot_input") # 定义输入数据集 ) def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset): # 强制读取输入数据框作为快照 input_df_all_dataframe = input_dataset.dataframe(mode="current") # 读取当前输出以查看之前构建中已经处理的内容 out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True), T.StructField("other_column", T.IntegerType(), True), # 包含输出中存储的其他列 T.StructField("processed_at", T.TimestampType(), True)]) output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema) # ==== 示例处理 ==== # 将输入与当前输出进行差异比较,以找到“新行”。 KEY = ["primary_key"] new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY) # 添加时间戳以便于跟踪/调试/理解示例 new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp()) # ==== 示例处理结束 ==== # 这将追加行 output_dataset.set_mode("modify") output_dataset.write_dataframe(new_rows_df)

解释:

  • @incremental@transform 装饰器用于在数据转换过程中提供高级功能,如增量处理和输入/输出控制。
  • 通过 mode="current" 读取数据集的当前快照。
  • 使用 join 操作查找输入数据集中新的行,并使用 left_anti 来获取不在输出数据集中的行。
  • current_timestamp() 用于在新行中添加处理时间戳,以便于跟踪和调试。
  • 最后,将新行以修改模式写入输出数据集。
  • 提交日期: 2024-07-18
  • 标签: code authoring, code repositories, python, incremental