注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
PySpark 是一种包装语言,允许您与 Apache Spark 后端接口以快速处理数据。Spark 可以在分布式服务器网络上操作非常大的数据集,这在正确使用时提供了重要的性能和可靠性优势。然而,它也有一些限制,特别是如果您更习惯于使用 SQL 等关系型数据库系统。例如,Spark 不可能确切知道某行在哪台服务器上存在,因此无法直接选择特定行进行更新或删除。如果您习惯以这种方式思考您的数据库,您将需要调整您的概念模型,以整体考虑数据集,并基于列而非行来处理数据。
与 SQL 不同,SQL 查询会生成“视图”(虚拟表结果集),而使用 PySpark 处理数据集会生成全新的数据集。这不仅允许您基于派生数据集搭建新的数据集,组织中的其他成员也可以重用中间数据集来进行他们自己的数据处理任务。在 Palantir Foundry 这个数据操作系统中,数据集通过父子(或,源-结果)定向树关系自动链接。这使得任何人都可以轻松地追踪 Spark 变换的数据沿袭。换句话说,您可以探索您的数据集的依赖项是如何搭建的,以及这些数据集来自哪里。您还可以发现组织中的其他成员如何使用数据集,从而可以从示例中学习或有效减少重复工作。
在代码工作簿中,您的函数可能如下所示:
Copied!1 2 3 4
def new_frame(old_frame): df = old_frame # df = 在 df 上进行转换 return df
这个函数 new_frame
接受一个数据框 old_frame
作为参数,然后对其进行某些转换(需要在注释中指定具体转换),最后返回转换后的数据框 df
。
old_frame
: 引用在 Foundry 中存储的一个代表数据集的DataFrame。old_frame
是不可变的,意味着它不能在这个new_frame
函数中被修改。从某种意义上说,所有中间的变换步骤都会生成一个新的、不可变的 dataframe,我们可能希望再次变换它或按原样返回。这并不完全正确,但作为一种认知模型,它将帮助您更好地组织代码。new_frame
: 在这个函数中,您可以定义一系列想要应用于old_frame
的变换。在这个例子中,您的return
语句应该返回一个DataFrame(我们称之为df
)。在底层,您对那个DataFrame应用的每个变换都会被组合并优化,然后应用到输入的数据集上。一旦您用代码触发搭建,结果将被保存到Foundry中的一个新的数据集文件中,您可以在构建完成后进行探索。DataFrame中的数据不能被直接引用,因为它不是Array
也不是Dictionary
。实际上,由于底层的所有分区和洗牌,无法确定任何数据在任何给定时刻的位置。除非您正在筛选或聚合数据集,否则您编写的代码应该对数据集的内容相对无关。排序通常是昂贵且缓慢的,所以经验法则是假设每行是随机排序的,将您的工具集限制在列、筛选、聚合和您自己的创造性问题解决上。
非常重要的是,您要跟踪传入列的模式,因为 PySpark 不是类型安全的,将尝试评估所有变换操作,并在任何操作在运行时失败时中断。
不要对字符串或日期执行数学函数,也不要对数字执行字符串操作,或对整数进行日期操作,因为冲突类型的行为难以预测。
请确保在操作之前将值转换为正确的类型。
DataFrame 的每一列都有一个名称(且可重命名)。列名是唯一且区分大小写的。在 Foundry 数据集 中遵循以下指南:
_
(下划线)而不是空格分隔单词(因为不允许使用空格)。camelCasedColumnNames
。(
、)
或&
。当您进入现有代码时,您会注意到没有关于如何命名引用DataFrames的变量的严格规则。在这个备忘单中,DataFrames将被引用为df
,但在其他例子中可能是raw
、out
、input
、table
、something_specific
。任何名称都可以,只要能完成任务。
您还会注意到这种模式:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 选择 "firstName" 和 "age" 列 df = df.select("firstName", "age") # 将 "age" 列的数据类型转换为整数 df = df.withColumn("age", df.age.cast("integer")) # 过滤出年龄大于21的记录 df = df.filter(df.age > 21) # 重命名 "firstName" 列为 "first_name" df = df.withColumnRenamed("firstName", "first_name") # 返回处理后的 DataFrame return df
或者(相同的内容,以不同的方式书写):
Copied!1 2 3 4
return df.select("firstName", "age") \ .withColumn("age", df.age.cast("integer")) \ # 将年龄列转换为整数类型 .filter(df.age > 21) \ # 过滤掉年龄小于等于21的记录 .withColumnRenamed("firstName", "first_name") # 将列名从firstName重命名为first_name
如果您不熟悉编码:=
左侧的df
是存储应用于右侧df
的变换结果的位置,然后再继续下一行代码。在此示例中,我们将结果存储到同名变量中,本质上是在每一步之后覆盖df
中包含的内容。您可以使用不同的名称来保存DataFrame变换的结果,但在大多数情况下,覆盖变量名称并继续是可以的。在每个变换函数的末尾,我们必须返回新的数据框,可以作为一个变量(在第一个示例中)或作为最后一个变换的结果(在第二个示例中)。
两个示例实现了相同的功能:
df
的两个列age
列以确保它是整数而不是字符串age > 21
的条目firstName
重命名为first_name
结果数据集将只有两列first_name
和age
,并且21岁及以下的人将被排除。这就是df
在最后包含的内容,您可以return
它或对其应用更多变换。我们将在接下来的部分中更详细地探讨这些变换。
在Foundry中有两个工具可以用于编写PySpark:代码仓库和代码工作簿。
在代码仓库中,您必须在.py
文档的顶部声明以下导入语句以使用大多数函数:
Copied!1 2
from pyspark.sql import functions as F # 从pyspark.sql模块中导入functions库,并将其别名为F
在 代码工作簿 中,这是一个已经包含的全局导入,因此您可以在不进行额外配置的情况下使用大多数函数。
此参考资料并不详尽,将侧重于提供一些关于常见模式和最佳实践的指导。有关完整的pySpark SQL函数列表,您可以参考官方 Apache Spark 文档 ↗。