Warning

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

構文チートシート

PySpark SQLで最も一般的に使用されるパターンと関数のクイックリファレンスガイド:

お探しの内容が見つからない場合は、PySpark 公式ドキュメント ↗ でカバーされている可能性があります。

一般的なパターン

ログ出力

Copied!
1# Within Code Workbook 2print("example log output") 3 4# Within Code Repositories 5import logging 6logger = logging.getLogger(__name__) 7logger.info("example log output")

関数とタイプのインポート

Copied!
1# Easily reference these as F.my_function() and T.my_type() below 2from pyspark.sql import functions as F, types as T

フィルタリング

Copied!
1# Filter on equals condition 2df = df.filter(df.is_adult == 'Y') 3 4# Filter on >, <, >=, <= condition 5df = df.filter(df.age > 25) 6 7# Multiple conditions require parentheses around each condition 8df = df.filter((df.age > 25) & (df.is_adult == 'Y')) 9 10# Compare against a list of allowed values 11df = df.filter(col('first_name').isin([3, 4, 7])) 12 13# Sort results 14df = df.orderBy(df.age.asc()) 15df = df.orderBy(df.age.desc())

結合

Copied!
1# Left join in another dataset 2df = df.join(person_lookup_table, 'person_id', 'left') 3 4# Left anti-join in another dataset (return unmatched rows in left dataframe) 5df = df.join(person_lookup_table, 'person_id', 'leftanti'); 6 7# Match on different columns in left & right datasets 8df = df.join(other_table, df.id == other_table.person_id, 'left') 9 10# Match on multiple columns 11df = df.join(other_table, ['first_name', 'last_name'], 'left') 12 13# Useful for one-liner lookup code joins 14def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value): 15 return ( 16 df1 17 .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left') 18 .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key))) 19 .drop(df2_key) 20 .drop(df2_value) 21 ) 22 23df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)

列操作

Copied!
1# Add a new static column 2df = df.withColumn('status', F.lit('PASS')) 3 4# Construct a new dynamic column 5df = df.withColumn('full_name', F.when( 6 (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname) 7).otherwise(F.lit('N/A'))) 8 9# Pick which columns to keep, optionally rename some 10df = df.select( 11 'name', 12 'age', 13 F.col('dob').alias('date_of_birth'), 14) 15 16# Remove columns 17df = df.drop('mod_dt', 'mod_username') 18 19# Rename a column 20df = df.withColumnRenamed('dob', 'date_of_birth') 21 22# Keep all the columns which also occur in another dataset 23df = df.select(*(F.col(c) for c in df2.columns)) 24 25# Batch Rename/Clean Columns 26for col in df.columns: 27 df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))

NULL値と重複の結合とキャスト

Copied!
1# Cast a column to a different type 2df = df.withColumn('price', df.price.cast(T.DoubleType())) 3 4# Replace all nulls with a specific value 5df = df.fillna({ 6 'first_name': 'Tom', 7 'age': 0, 8}) 9 10# Take the first value that is not null 11df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A'))) 12 13# Drop duplicate rows in a dataset (same as distinct()) 14df = df.dropDuplicates() 15 16# Drop duplicate rows, but consider only specific columns 17df = df.dropDuplicates(['name', 'height'])

文字列操作

文字列フィルタ

Copied!
1# Contains - col.contains(string) 2df = df.filter(df.name.contains('o')) 3 4# Starts With - col.startswith(string) 5df = df.filter(df.name.startswith('Al')) 6 7# Ends With - col.endswith(string) 8df = df.filter(df.name.endswith('ice')) 9 10# Is Null - col.isNull() 11df = df.filter(df.is_adult.isNull()) 12 13# Is Not Null - col.isNotNull() 14df = df.filter(df.first_name.isNotNull()) 15 16# Like - col.like(string_with_sql_wildcards) 17df = df.filter(df.name.like('Al%')) 18 19# Regex Like - col.rlike(regex) 20df = df.filter(df.name.rlike('[A-Z]*ice$')) 21 22# Is In List - col.isin(*values) 23df = df.filter(df.name.isin('Bob', 'Mike'))

文字列関数

Copied!
1# Substring - col.substr(startPos, length) (1-based indexing) 2df = df.withColumn('short_id', df.id.substr(1, 10)) 3 4# Trim - F.trim(col) 5df = df.withColumn('name', F.trim(df.name)) 6 7# Left Pad - F.lpad(col, len, pad) 8# Right Pad - F.rpad(col, len, pad) 9df = df.withColumn('id', F.lpad('id', 4, '0')) 10 11# Left Trim - F.ltrim(col) 12# Right Trim - F.rtrim(col) 13df = df.withColumn('id', F.ltrim('id')) 14 15# Concatenate - F.concat(*cols) (null if any column null) 16df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname')) 17 18# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols) (ignores nulls) 19df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname')) 20 21# Regex Replace - F.regexp_replace(str, pattern, replacement) 22df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1')) 23 24# Regex Extract - F.regexp_extract(str, pattern, idx) 25df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))

数値操作

Copied!
1# Round - F.round(col, scale=0) 2df = df.withColumn('price', F.round('price', 0)) 3 4# Floor - F.floor(col) 5df = df.withColumn('price', F.floor('price')) 6 7# Ceiling - F.ceil(col) 8df = df.withColumn('price', F.ceil('price')) 9 10# Absolute Value - F.abs(col) 11df = df.withColumn('price', F.abs('price')) 12 13# X raised to power Y – F.pow(x, y) 14df = df.withColumn('exponential_growth', F.pow('x', 'y')) 15 16# Select smallest value out of multiple columns – F.least(*cols) 17df = df.withColumn('least', F.least('subtotal', 'total')) 18 19# Select largest value out of multiple columns – F.greatest(*cols) 20df = df.withColumn('greatest', F.greatest('subtotal', 'total'))

日付とタイムスタンプ操作

Copied!
1# Convert a string of known format to a date (excludes time information) 2df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd')) 3 4# Convert a string of known format to a timestamp (includes time information) 5df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss')) 6 7# Get year from date: F.year(col) 8# Get month from date: F.month(col) 9# Get day from date: F.dayofmonth(col) 10# Get hour from date: F.hour(col) 11# Get minute from date: F.minute(col) 12# Get second from date: F.second(col) 13df = df.filter(F.year('date_of_birth') == F.lit('2017')) 14 15# Add & subtract days 16df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3)) 17df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3)) 18 19# Add & subtract months 20df = df.withColumn('next_month', F.add_months('date_of_birth', 1)) 21df = df.withColumn('previous_month', F.add_months('date_of_birth', -1)) 22 23# Get number of days between two dates 24df = df.withColumn('days_between', F.datediff('end', 'start')) 25 26# Get number of months between two dates 27df = df.withColumn('months_between', F.months_between('end', 'start')) 28 29# Keep only rows where date_of_birth is between 2017-05-10 and 2018-07-21 30df = df.filter( 31 (F.col('date_of_birth') >= F.lit('2017-05-10')) & 32 (F.col('date_of_birth') <= F.lit('2018-07-21')) 33)

配列と構造体の操作

Copied!
1# Column Array - F.array(*cols) 2df = df.withColumn('full_name', F.array('fname', 'lname')) 3 4# Empty Array - F.array(*cols) 5df = df.withColumn('empty_array_column', F.array(F.lit(""))) 6 7# Array or Struct column from existing columns 8df = df.withColumn('guardians', F.array('guardian_1', 'guardian_2')) 9df = df.withColumn('properties', F.struct('hair_color', 'eye_color')) 10 11# Extract from Array or Struct column by index or key (null if invalid) 12df = df.withColumn('hair_color', F.element_at(F.col('properties'), F.col('hair_color'))) 13 14# Explode Array or Struct column into multiple rows 15df = df.select(F.col('child_name'), F.explode(F.col('guardians'))) 16df = df.select(F.col('child_name'), F.explode(F.col('properties'))) 17 18# Explode Struct column into multiple columns 19df = df.select(F.col('child_name'), F.col('properties.*'))

集約操作

Copied!
1# Row Count: F.count(*cols), F.countDistinct(*cols) 2# Sum of Rows in Group: F.sum(*cols) 3# Mean of Rows in Group: F.mean(*cols) 4# Max of Rows in Group: F.max(*cols) 5# Min of Rows in Group: F.min(*cols) 6# First Row in Group: F.first(*cols, ignorenulls=False) 7df = df.groupBy(col('address')).agg( 8 count('uuid').alias('num_residents'), 9 max('age').alias('oldest_age'), 10 first('city', True).alias('city') 11) 12 13# Collect a Set of all Rows in Group: F.collect_set(col) 14# Collect a List of all Rows in Group: F.collect_list(col) 15df = df.groupBy('address').agg(F.collect_set('name').alias('resident_names'))

高度な操作

再パーティショニング

Copied!
1# Repartition – df.repartition(num_output_partitions) 2df = df.repartition(1)

UDF (ユーザー定義関数)

Copied!
1# Multiply each row's age column by two 2times_two_udf = F.udf(lambda x: x * 2) 3df = df.withColumn('age', times_two_udf(df.age)) 4 5# Randomly choose a value to use as a row's name 6import random 7 8random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna'])) 9df = df.withColumn('name', random_name_udf())