注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
如何在输入或源数据集为空时,防止或中止变换运行?
此代码使用PySpark变换来检查输入DataFrame是否为空,然后再将其写入输出位置。如果DataFrame为空,则中止事务,防止写入空的DataFrame。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from pyspark.sql import functions as F from transforms.api import transform, Input, Output @transform( out=Output("/Palantir/output_location/datasets/not_process_empty_files"), source_df=Input("/Palantir/input_location/sometimes_empty"), ) def compute(source_df, out): source_df = source_df.dataframe() # 检查是否至少有一行需要写入 if len(source_df.head(1)) == 0: # 如果没有行需要写入,则中止事务 out.abort() else: # 如果有数据行,则写入(追加) out.write_dataframe(source_df)
以上代码使用了PySpark和Transforms API来处理数据集。在函数compute
中,首先检查输入数据框source_df
是否为空。如果source_df
为空,事务将被中止;否则,数据将被写入输出位置。
code authoring
, code repositories
, python
, abort
如何在PySpark变换中创建一个dataframe并将其写入输出。
此代码定义了一个创建包含单个行的空DataFrame的函数,该行包含一个虚拟键和一个时间戳列。然后,它使用变换装饰器将DataFrame写入输出数据集路径。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
from pyspark.sql import types as T from pyspark.sql import functions as F # 生成一个空的DataFrame,该DataFrame仅附加当前时间戳 def get_empty_df(ctx): # 定义DataFrame的模式 schema = T.StructType([T.StructField("key", T.StringType(), True)]) # 创建一个包含单行虚拟键的DataFrame df = ctx.spark_session.createDataFrame([("dummy_key",)], schema) # 添加一个新的列'when',其值为当前时间戳 df = df.withColumn('when', F.current_timestamp()) return df @transform( out=Output("output_dataset_path") ) def out_1(ctx, out): # 创建空的DataFrame df = get_empty_df(ctx) # 将DataFrame写入输出路径 out.write_dataframe(df)
这里的代码使用了PySpark来创建一个简单的DataFrame,并在其中添加了一个当前时间戳的列。get_empty_df
函数定义了一个模式(schema),创建包含一个虚拟键的DataFrame,并且增加了一个名为when
的列用来存储当前时间戳。out_1
函数调用get_empty_df
来生成DataFrame,并将其写入指定的输出路径。
代码编写
, 代码库
, python
如何在PySpark中使用语音编码执行实体名称的模糊匹配?
此代码使用PySpark清理实体名称,生成语音编码,并使用Jaro相似性指标执行实体名称的模糊匹配。它对于在两个数据集中匹配相似的实体名称很有用。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
from pyspark.sql import functions as F from pyspark.sql import types as T from transforms.api import transform_df, Input, Output import re import jellyfish def _add_phonetic_codes(df): # 为名字的每个部分生成语音代码 df = df.withColumn( "name_part", F.split("cleaned_name", " ") ).withColumn( "name_part", F.explode("name_part") ).withColumn( "phonetic_code", F.soundex("name_part") ).drop("name_part") return df @transform_df( Output(), entities2=Input(), entities1=Input(), ) def compute(sanctions, entities): # 设置用于清理文本的UDF def clean_text(text): cleaned_text = re.sub(r" +", " ", re.sub(r"[./-]+", "", text)).lower() return cleaned_text clean_text_udf = F.udf(clean_text, T.StringType()) # 清理实体名称 entities2 = entities2.withColumn("cleaned_name", clean_text_udf(F.col("name"))) entities1 = entities1.withColumn("cleaned_name", clean_text_udf(F.col("entity_name"))) # 添加语音代码 entities2 = _add_phonetic_codes(entities2) entities1 = _add_phonetic_codes(entities1) # 模糊连接 matched_entities = entities1.join( entities2, on=["phonetic_code"], how="inner" ).select( entities1.cleaned_name.alias("cleaned_name1"), entities1.id.alias("entity_id1"), entities2.cleaned_name.alias("cleaned_name2"), entities2.id.alias("entity_id2") ).drop("phonetic_code") matched_entities = matched_entities.dropDuplicates() # 设置用于字符串比较的UDF @F.udf() def jaro_compare(name1, name2): return jellyfish.jaro_similarity(name1, name2) # 模糊匹配 matched_entities = matched_entities.withColumn( "match_score", jaro_compare("cleaned_name1", "cleaned_name2") ) matched_entities = matched_entities.filter(entities.match_score > 0.75) matched_entities = matched_entities.select("entity_id1", "entity_id2") return matched_entities
这段代码的主要功能是通过生成语音编码和使用模糊匹配方法,比较两个实体数据集中的名称以找出可能的匹配。通过使用jellyfish
库的Jaro相似度函数进行字符串相似度比较,来筛选出相似度大于0.75的匹配项。
pyspark
, 模糊匹配
, 语音编码
, jaro相似度
如何使用PySpark加载ORC文件?
此代码从输入数据集的Hadoop路径读取原始ORC文件,并将生成的spark dataframe写入输出。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
from transforms.api import transform, Input, Output @transform( out=Output("output"), raw=Input("input"), ) def compute(ctx, out, raw): # 获取原始输入的Hadoop路径 hadoop_path = raw.filesystem().hadoop_path # 使用Spark会话读取ORC格式的数据文件 df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/') # 将读取的数据写入输出 out.write_dataframe(df)
pyspark
, dataframe
, orc
, hadoop
如何在Foundry中创建一个多输入、多输出的变换?
此代码演示了如何创建一个PySpark变换,该变换接受多个输入数据集并生成多个输出数据集。它使用@transform装饰器并明确命名输入和输出。变换读取输入数据框,处理它们,并将结果写入指定的输出数据集。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F # @incremental 装饰器是可选的(兼容两种使用方式) # 将 @transform_df 改为 @transform # 这样可以更好地控制输入和输出 # 还需要显式命名输出 @transform( output_dataset_1=Output("3_multi_output_1"), output_dataset_2=Output("3_multi_output_2"), input_dataset_1=Input("fake_dataset"), input_dataset_2=Input("fake_dataset_2") ) def example_transform_multi_inputs_outputs(input_dataset_1, input_dataset_2, output_dataset_1, output_dataset_2): # 可以读取输入 input_df_1 = input_dataset_1.dataframe() input_df_2 = input_dataset_2.dataframe() # 这里是示例处理 only_data_from_1 = input_df_1.withColumn('processed_at', F.current_timestamp()) unioned_version = input_df_1.unionByName(input_df_2).withColumn('processed_at', F.current_timestamp()) # 不再返回修改后的 dataframe,而是需要在选择的输出上显式调用 "write_dataframe" # 使用选择的 dataframe(要写入的)进行写入。 output_dataset_1.write_dataframe(only_data_from_1) output_dataset_2.write_dataframe(unioned_version)
code authoring
, code repositories
, python