数据连接与集成Python转换PySpark Reference语法备忘单

注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

语法备忘单

一个关于 PySpark SQL 中最常用模式和函数的快速参考指南:

如果您找不到所需内容,可能会在 PySpark 官方文档 ↗ 中找到。

常用模式

日志输出

Copied!
1# 在代码工作簿中 2print("example log output") # 打印日志输出示例 3 4# 在代码仓库中 5import logging 6logger = logging.getLogger(__name__) # 获取一个以当前模块命名的logger 7logger.info("example log output") # 记录信息级别的日志

导入函数和类型

Copied!
1# 可以通过 F.my_function() 和 T.my_type() 来方便地引用这些函数和类型 2from pyspark.sql import functions as F, types as T

筛选

Copied!
1# 根据相等条件进行过滤 2df = df.filter(df.is_adult == 'Y') 3 4# 根据 >, <, >=, <= 条件进行过滤 5df = df.filter(df.age > 25) 6 7# 多个条件需要在每个条件周围加上括号 8df = df.filter((df.age > 25) & (df.is_adult == 'Y')) 9 10# 对比一个允许值的列表 11df = df.filter(col('first_name').isin([3, 4, 7])) 12 13# 排序结果 14df = df.orderBy(df.age.asc()) # 按年龄升序排序 15df = df.orderBy(df.age.desc()) # 按年龄降序排序

合并

Copied!
1# 在另一个数据集中进行左连接 2df = df.join(person_lookup_table, 'person_id', 'left') 3 4# 在另一个数据集中进行左反连接(返回左侧数据集中未匹配的行) 5df = df.join(person_lookup_table, 'person_id', 'leftanti'); 6 7# 在左侧和右侧数据集中不同的列上进行匹配 8df = df.join(other_table, df.id == other_table.person_id, 'left') 9 10# 在多个列上进行匹配 11df = df.join(other_table, ['first_name', 'last_name'], 'left') 12 13# 用于一行代码的查找连接 14def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value): 15 return ( 16 df1 17 .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left') 18 .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key))) # 使用coalesce函数替换空值 19 .drop(df2_key) # 删除不需要的列 20 .drop(df2_value) # 删除不需要的列 21 ) 22 23df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)

列操作

Copied!
1# 添加一个新的静态列 2df = df.withColumn('status', F.lit('PASS')) 3 4# 构建一个新的动态列 5df = df.withColumn('full_name', F.when( 6 (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname) 7).otherwise(F.lit('N/A'))) # 如果fname和lname不为null,则连接它们,否则填充为'N/A' 8 9# 选择需要保留的列,可以选择性地重命名一些列 10df = df.select( 11 'name', 12 'age', 13 F.col('dob').alias('date_of_birth'), # 将'dob'列重命名为'date_of_birth' 14) 15 16# 删除列 17df = df.drop('mod_dt', 'mod_username') # 删除'mod_dt'和'mod_username'列 18 19# 重命名列 20df = df.withColumnRenamed('dob', 'date_of_birth') # 将'dob'列重命名为'date_of_birth' 21 22# 保留所有在另一个数据集(df2)中也存在的列 23df = df.select(*(F.col(c) for c in df2.columns)) # 选择df2中也存在的列 24 25# 批量重命名/清理列名 26for col in df.columns: 27 df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_')) # 将列名转换为小写,并用下划线替换空格和破折号

转换和合并空值及重复项

Copied!
1# 将列转换为不同的数据类型 2df = df.withColumn('price', df.price.cast(T.DoubleType())) 3 4# 用特定值替换所有的空值 5df = df.fillna({ 6 'first_name': 'Tom', # 如果first_name为null,则替换为'Tom' 7 'age': 0, # 如果age为null,则替换为0 8}) 9 10# 取第一个非空值 11df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A'))) 12# 如果last_name为null,则尝试使用surname,如果surname也为null,则使用'N/A' 13 14# 删除数据集中重复的行(与distinct()功能相同) 15df = df.dropDuplicates() 16 17# 删除重复的行,但只考虑特定的列 18df = df.dropDuplicates(['name', 'height']) 19# 仅在'name'和'height'这两列的组合上检查重复

字符串操作

字符串筛选

Copied!
1# 包含 - col.contains(string) 2df = df.filter(df.name.contains('o')) # 过滤name列中包含'o'的行 3 4# 以...开始 - col.startswith(string) 5df = df.filter(df.name.startswith('Al')) # 过滤name列中以'Al'开头的行 6 7# 以...结束 - col.endswith(string) 8df = df.filter(df.name.endswith('ice')) # 过滤name列中以'ice'结尾的行 9 10# 为空 - col.isNull() 11df = df.filter(df.is_adult.isNull()) # 过滤is_adult列为空值的行 12 13# 不为空 - col.isNotNull() 14df = df.filter(df.first_name.isNotNull()) # 过滤first_name列不为空值的行 15 16# 类似 - col.like(string_with_sql_wildcards) 17df = df.filter(df.name.like('Al%')) # 过滤name列中以'Al'开头的行(SQL通配符) 18 19# 正则表达式匹配 - col.rlike(regex) 20df = df.filter(df.name.rlike('[A-Z]*ice$')) # 过滤name列中符合正则表达式'[A-Z]*ice$'的行 21 22# 在列表中 - col.isin(*values) 23df = df.filter(df.name.isin('Bob', 'Mike')) # 过滤name列中值为'Bob'或'Mike'的行

字符串函数

Copied!
1# Substring - col.substr(startPos, length) (1-based indexing) 2# 截取子字符串 - col.substr(startPos, length) (基于1的索引) 3df = df.withColumn('short_id', df.id.substr(1, 10)) 4 5# Trim - F.trim(col) 6# 去除字符串首尾空格 - F.trim(col) 7df = df.withColumn('name', F.trim(df.name)) 8 9# Left Pad - F.lpad(col, len, pad) 10# 左填充 - F.lpad(col, len, pad) 11# Right Pad - F.rpad(col, len, pad) 12# 右填充 - F.rpad(col, len, pad) 13df = df.withColumn('id', F.lpad('id', 4, '0')) 14 15# Left Trim - F.ltrim(col) 16# 去除字符串左侧空格 - F.ltrim(col) 17# Right Trim - F.rtrim(col) 18# 去除字符串右侧空格 - F.rtrim(col) 19df = df.withColumn('id', F.ltrim('id')) 20 21# Concatenate - F.concat(*cols) (null if any column null) 22# 连接字符串 - F.concat(*cols) (如果任何列为null,则结果为null) 23df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname')) 24 25# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols) (ignores nulls) 26# 使用分隔符连接字符串 - F.concat_ws(delimiter, *cols) (忽略null值) 27df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname')) 28 29# Regex Replace - F.regexp_replace(str, pattern, replacement) 30# 正则表达式替换 - F.regexp_replace(str, pattern, replacement) 31df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1')) 32 33# Regex Extract - F.regexp_extract(str, pattern, idx) 34# 正则表达式提取 - F.regexp_extract(str, pattern, idx) 35df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))

数字运算

Copied!
1# 四舍五入 - F.round(col, scale=0) 2df = df.withColumn('price', F.round('price', 0)) 3 4# 向下取整 - F.floor(col) 5df = df.withColumn('price', F.floor('price')) 6 7# 向上取整 - F.ceil(col) 8df = df.withColumn('price', F.ceil('price')) 9 10# 绝对值 - F.abs(col) 11df = df.withColumn('price', F.abs('price')) 12 13# x 的 y 次幂 – F.pow(x, y) 14df = df.withColumn('exponential_growth', F.pow('x', 'y')) 15 16# 选择多个列中的最小值 – F.least(*cols) 17df = df.withColumn('least', F.least('subtotal', 'total')) 18 19# 选择多个列中的最大值 – F.greatest(*cols) 20df = df.withColumn('greatest', F.greatest('subtotal', 'total'))

日期和时间戳操作

Copied!
1# 将已知格式的字符串转换为日期(不包括时间信息) 2df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd')) 3 4# 将已知格式的字符串转换为时间戳(包括时间信息) 5df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss')) 6 7# 从日期中获取年份: F.year(col) 8# 从日期中获取月份: F.month(col) 9# 从日期中获取天数: F.dayofmonth(col) 10# 从日期中获取小时: F.hour(col) 11# 从日期中获取分钟: F.minute(col) 12# 从日期中获取秒数: F.second(col) 13df = df.filter(F.year('date_of_birth') == F.lit('2017')) 14 15# 加/减天数 16df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3)) 17df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3)) 18 19# 加/减月份 20df = df.withColumn('next_month', F.add_months('date_of_birth', 1)) 21df = df.withColumn('previous_month', F.add_months('date_of_birth', -1)) 22 23# 获取两个日期之间的天数 24df = df.withColumn('days_between', F.datediff('end', 'start')) 25 26# 获取两个日期之间的月数 27df = df.withColumn('months_between', F.months_between('end', 'start')) 28 29# 仅保留date_of_birth在2017-05-10和2018-07-21之间的行 30df = df.filter( 31 (F.col('date_of_birth') >= F.lit('2017-05-10')) & 32 (F.col('date_of_birth') <= F.lit('2018-07-21')) 33)

数组和结构体操作

Copied!
1# 列数组 - F.array(*cols) 2df = df.withColumn('full_name', F.array('fname', 'lname')) # 将'fname'和'lname'列组合成一个数组,作为'full_name'列 3 4# 空数组 - F.array(*cols) 5df = df.withColumn('empty_array_column', F.array(F.lit(""))) # 创建一个包含空字符串的数组,并添加到'empty_array_column'列 6 7# 从现有列创建数组或结构体列 8df = df.withColumn('guardians', F.array('guardian_1', 'guardian_2')) # 将'guardian_1'和'guardian_2'列组合成一个数组,作为'guardians'列 9df = df.withColumn('properties', F.struct('hair_color', 'eye_color')) # 将'hair_color'和'eye_color'列组合成一个结构体,作为'properties'列 10 11# 通过索引或键从数组或结构体列中提取(无效时返回null) 12df = df.withColumn('hair_color', F.element_at(F.col('properties'), F.col('hair_color'))) # 从'properties'结构体中提取'hair_color'字段的值 13 14# 将数组或结构体列展开为多行 15df = df.select(F.col('child_name'), F.explode(F.col('guardians'))) # 将'guardians'数组展开为多行,每个元素一行 16df = df.select(F.col('child_name'), F.explode(F.col('properties'))) # 将'properties'结构体展开为多行(不常用,可能误用) 17 18# 将结构体列展开为多个列 19df = df.select(F.col('child_name'), F.col('properties.*')) # 将'properties'结构体的每个字段展开为单独的列

聚合操作

Copied!
1# 行计数: F.count(*cols), F.countDistinct(*cols) 2# 组内行之和: F.sum(*cols) 3# 组内行的平均值: F.mean(*cols) 4# 组内行的最大值: F.max(*cols) 5# 组内行的最小值: F.min(*cols) 6# 组内的第一行: F.first(*cols, ignorenulls=False) 7df = df.groupBy(col('address')).agg( 8 count('uuid').alias('num_residents'), # 计算每个地址的居民数 9 max('age').alias('oldest_age'), # 找出每个地址的最高年龄 10 first('city', True).alias('city') # 找出每个地址的第一个城市,忽略空值 11) 12 13# 收集组内所有行的集合: F.collect_set(col) 14# 收集组内所有行的列表: F.collect_list(col) 15df = df.groupBy('address').agg(F.collect_set('name').alias('resident_names')) # 收集每个地址的居民姓名集合

高级操作

重新分区

Copied!
1# 重分区 – df.repartition(num_output_partitions) 2# 重新将数据分区为一个分区 3df = df.repartition(1)

UDFs(用户定义函数)

Copied!
1# 将每行的年龄列乘以二 2times_two_udf = F.udf(lambda x: x * 2) 3df = df.withColumn('age', times_two_udf(df.age)) 4 5# 随机选择一个值作为行的名称 6import random 7 8random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna'])) 9df = df.withColumn('name', random_name_udf())