注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
如何使用PySpark和Openpyxl以分布式方式读取和处理具有动态模式的复杂Excel文件?
此代码使用PySpark和Openpyxl库从输入文件系统读取多个Excel文件,解析其内容,并将其转换为PySpark DataFrame。然后将这些DataFrame合并为一个DataFrame并写入输出。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import openpyxl import functools @transform( processed_excel=Output("example_processed_dataframe"), excel_input=Input("example_excel_dataframe"), ) def compute(ctx, processed_excel, excel_input): def parse_file(file_status): # 打开 Excel 文件 with excel_input.filesystem().open(file_status.path, "rb") as in_xlsx: # 创建一个临时文件用于处理 with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_xlsx: shutil.copyfileobj(in_xlsx, tmp_xlsx) tmp_xlsx.flush() # 加载 Excel 工作簿并解析其内容 try: workbook = openpyxl.load_workbook(tmp_xlsx.name) return parse_workbook(workbook) except: return None # 从输入文件系统获取 Excel 文件列表 files_df = excel_input.filesystem().files() # 使用 'parse_file' 函数解析每个文件 parsed_files = files_df.rdd.map(parse_file).collect() # 将解析后的文件转换为 PySpark 数据框 dfs = [] for parsed_file in parsed_files: dfs.append(convert_to_df(ctx, parsed_file)) # 合并数据框并将结果写入输出 df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) processed_excel.write_dataframe(df)
这段代码是一个数据处理管道,使用 PySpark 和 transforms API 处理 Excel 文件。主要步骤如下:
parse_file 函数:负责打开 Excel 文件,将其复制到一个临时文件中,然后尝试加载 Excel 工作簿并解析其内容。
files_df:从输入文件系统中获取 Excel 文件列表。
parsed_files:使用 RDD 的 map 函数并行解析每个 Excel 文件。
dfs:将解析后的文件内容转换为 PySpark 数据框。
数据框合并:使用 unionByName
函数合并所有数据框,允许缺失列。
结果写入:将最终合并的数据框写入输出。
代码创作
, 代码存储库
, python
, openpyxl
我如何合并多个shapefile并将它们转换为GeoJSON格式?
此代码使用geospatial_tools
库读取多个shapefile,将其几何图形转换为GeoJSON格式,并将它们合并为一个PySpark DataFrame。它还计算每个几何图形的质心并将其转换为geohash。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
from transforms.api import transform, Input, Output from geospatial_tools import geospatial from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash import tempfile import shutil import geopandas as gpd from pyspark.sql import types as T from pyspark.sql import functions as F import json from shapely.geometry import mapping @geospatial() @transform( output=Output(), input_data=Input(), ) def compute(ctx, input_data, output): fs = input_data.filesystem() schema = T.StructType([T.StructField("geoshape", T.StringType()), T.StructField("name", T.StringType()), T.StructField("centroid", T.StringType())]) shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')] combined_data = ctx.spark_session.createDataFrame([], schema) for shapefile in shapefiles: # 遍历所有Shapefile文件 with tempfile.TemporaryDirectory() as tmp_dir: # 将Shapefile的所有关联文件复制到本地文件系统 # Shapefile关联多个文件,例如.prj和.cpg for shapefile_file in fs.ls(glob=f'{shapefile}.*'): with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file: with fs.open(shapefile_file.path, 'rb') as f: shutil.copyfileobj(f, tmp_file) # 创建GeoJSON几何列 pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp') pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x))) df = ctx.spark_session.createDataFrame(pdf) # 将所有数据转换为Foundry期望的EPSG:4326格式 crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string() df = df.withColumn( "geoshape", clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326")) ).select("geoshape") df = df.withColumn('name', F.lit(shapefile)) df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape'))) combined_data = combined_data.unionByName(df) return output.write_dataframe(combined_data)
该代码实现了从输入数据中读取Shapefile文件,处理其几何数据,并输出包含地理信息的DataFrame。
初始化和输入输出声明:
transforms.api
库定义一个transform
函数,指定输入和输出。geospatial()
装饰器标记这个函数用于地理空间数据处理。文件系统操作:
input_data.filesystem()
获取输入数据的文件系统访问接口。.shp
)的路径。创建临时目录并复制文件:
.prj
和.cpg
)复制到本地文件系统。处理几何数据:
geopandas
读取本地Shapefile,创建GeoJSON几何列。坐标系转换和几何清理:
clean_geometry
函数进行几何数据清理。计算中心点和合并数据:
输出结果:
geospatial
, shapefile
, geojson
, geohash
, pyspark
, geopandas
如何在Python创作变换中将原始文件从输入数据集复制到输出数据集?
此代码定义了一个PySpark变换函数,以将原始文件从输入数据集复制到输出数据集。它使用'shutil'库复制文件字节,并允许根据提供的正则表达式模式复制所有文件或仅复制子集。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
from transforms.api import transform, Input, Output from pyspark.sql import DataFrame from functools import reduce import shutil # 在一个 Python 转换中,将原始文件从输入数据集复制到输出数据集 @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def copy_my_input(my_output, my_input): # 调用函数复制符合正则表达式的原始文件 copy_raw_files(my_output, my_input, [".*\.csv"], False) def copy_raw_files(my_output, my_input, regexes, copy_full=False): # 复制原始文件 def copy_file(file_status): # 在输入 dataframe 文件系统中打开给定文件 with my_input.filesystem().open(file_status.path, 'rb') as in_f: # 在输出 dataframe 文件系统中打开一个文件 with my_output.filesystem().open(file_status.path, 'wb') as out_f: # 从输入复制文件字节到输出 shutil.copyfileobj(in_f, out_f) # 选择复制所有文件还是仅复制一个子集 if copy_full: files_df = my_input.filesystem().files() # 获取所有文件 else: files_to_copy = [] for regex in regexes: # 仅复制匹配正则表达式的文件 files_to_copy.append(my_input.filesystem().files(regex=regex)) # 创建一个包含所有文件的 dataframe files_df = reduce(DataFrame.unionByName, files_to_copy) # 这将并行化复制操作 files_df.rdd.foreach(copy_file)
注释中添加了中文解释,帮助理解代码中每个步骤的作用。
code authoring
, code repositories
, python
如何使用PySpark处理数据集中的多个文件?
此代码使用PySpark处理数据集中的多个文件,包括gzipped文件,方法是读取每个文件的第一行,并创建一个包含文件信息和第一行内容的数据框。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F from pyspark.sql import Row import gzip import io # @incremental 装饰器可选(兼容两者) # 将 @transform_df 更改为 @transform # 这提供了对输入和输出的更多控制,并且需要访问输入数据集的“文件”版本 @transform( output_dataset_1=Output(""), output_dataset_2=Output(""), input_dataset=Input("") ) def example_transform_file_processing(ctx, input_dataset, output_dataset_1, output_dataset_2): # "files()" 方法返回一个表示输入数据集文件系统的 dataframe fs = input_dataset.filesystem() files_df = input_dataset.filesystem().files() # 这里可以提取每个文件的路径,然后以任何需要或想要的方式处理它们 # ==== 计算示例 # 定义 rdd flatmap 输出的 schema schema = T.StructType([ T.StructField('hadoop_path', T.StringType()), T.StructField('file_name', T.StringType()), T.StructField('size', T.LongType()), T.StructField('modified', T.LongType()), T.StructField('first_row_content', T.StringType()) ]) cols = schema.fieldNames() # 等价于 :["hadoop_path", "file_name", "size", "modified"] MyRow = Row(*cols) # 定义 "MyRow" 对象,作为 RDD 类似 UDF 的函数的返回类型 # 解析一个文件的内联函数(想法:像 UDF,但用于 RDD) def process_file(file_status): # 处理示例:读取每个文件的第一行 line = "default value" try: line = "WARNING: Not supported file type." if file_status.path.endswith('.gz'): # 处理 Gzip 文件 with fs.open(file_status.path, "rb") as f: gz = gzip.GzipFile(fileobj=f) br = io.BufferedReader(gz) tw = io.TextIOWrapper(br) line = tw.readline() else: with fs.open(file_status.path, "r") as f: line = f.readline() except Exception as e: line = "ERROR: " + str(e) # 从 RDD 元素创建一个行 yield MyRow(fs.hadoop_path, file_status.path, file_status.size, file_status.modified, line) # 将文件 dataframe 转换为 RDD。参见 https://spark.apache.org/docs/latest/rdd-programming-guide.html rdd = files_df.rdd # 对 RDD 的每个元素应用一个函数 rdd = rdd.flatMap(process_file) # 将 RDD 转换为 dataframe,以便轻松写入输出 # 指定 schema 可以处理空 rdd。 output_df = ctx.spark_session.createDataFrame(rdd, schema) # 添加时间戳 output_df = output_df.withColumn('processed_at', F.current_timestamp()) # ==== 计算示例结束 # 将输入的文件系统 dataframe 表示写入输出 output_dataset_1.write_dataframe(files_df) # 将处理后的 dataframe 写入输出 output_dataset_2.write_dataframe(output_df)
code authoring
, code repositories
, python
, gzip
, zip
如何使用PySpark加载ORC文件?
此代码从输入数据集的Hadoop路径读取原始ORC文件,并将结果spark dataframe写入输出。
pyspark
, dataframe
, orc
, hadoop
如何从SAS数据集中创建PySpark dataframe?
此代码定义了一个变换函数,该函数接受包含原始SAS文件的输入数据集,并从中创建一个PySpark dataframe。它使用spark-sas7bdat
包读取SAS文件并将其加载到dataframe中。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
@transform( output=Output("xxxxx"), # 在此处包含 foundry RID input_df=Input("xxxxx") # 在此处包含 foundry RID ) def parse_sas_file(ctx, input_df, output, sas_path="*.sas7bdat"): ''' 从 SAS 数据集创建一个 PySpark DataFrame 注意,此函数在驱动程序中执行计算,可能需要增加驱动程序内存 ctx: Spark 上下文 input_df: 包含原始 SAS 文件的输入数据集 sas_path: 数据集中 SAS 文件的路径,默认为数据集中的所有 SAS 文件 include_filename_as_field: 将文件名作为列包含以便下游解析;默认为 false ''' fs = input_df.filesystem() hadoop_path = fs.hadoop_path files_df = fs.files(sas_path) # dfs = [] spark_session = ctx.spark_session.builder.appName(ctx.spark_session.sparkContext.appName).config('spark.jars.packages', 'saurfang:spark-sas7bdat:3.0.0-s_2.12').getOrCreate() # TODO: 更新此代码以处理多个路径 # 从后备数据集中读取文件 path = files_df.collect()[0].path full_path = f'{hadoop_path}/{path}' df = spark_session.read.format("com.github.saurfang.sas.spark").load(full_path) output.write_dataframe(df)
这里的代码定义了一个名为 parse_sas_file
的函数,用于将 SAS 数据集转换为 PySpark 的 DataFrame。注意事项包括该函数在驱动程序中执行计算,可能需要增加驱动程序的内存。当前实现仅支持单个路径的读取,未来可以更新以支持多个路径。
pyspark
, dataframe
, sas
, 代码库
如何处理数据集中的多个文件并将它们合并为一个单一的PySpark DataFrame?
此代码定义了一个PySpark变换,以一个包含多个文件的输入数据集为输入,单独处理每个文件,并将结果合并为一个单一的PySpark DataFrame。它使用'map'函数将'parse_file'函数应用于数据集中的每个文件,收集结果,并将所有DataFrame合并。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import tempfile import shutil import functools # 定义 PySpark 转换 @transform( processed_files=Output("example_processed_files_dataset"), file_dataset=Input("example_file_dataset"), ) def compute(ctx, processed_files, file_dataset): # 解析单个文件的函数 def parse_file(file_status): # 打开文件并创建临时文件 with file_dataset.filesystem().open(file_status.path, "rb") as in_file: with tempfile.NamedTemporaryFile() as tmp_file: shutil.copyfileobj(in_file, tmp_file) tmp_file.flush() # 本地处理文件并返回 Python 对象 return process_file_locally_and_return_python_object(tmp_file) # 获取数据集中文件的列表 files_df = file_dataset.filesystem().files() # 解析每个文件并收集结果 parsed_files = files_df.rdd.map(parse_file).collect() dfs = [] for parsed_file in parsed_files: # 将解析后的文件转换为 PySpark DataFrame dfs.append(convert_to_df(ctx, parsed_file)) # 将所有 DataFrame 合并在一起 df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs) # 将结果 DataFrame 写入输出数据集 processed_files.write_dataframe(df)
在这个代码中,我们使用 PySpark 和 transforms API 来定义一个数据转换过程。首先定义了一个 compute
函数,它从输入数据集中读取文件,解析每个文件并将其转换为 PySpark DataFrame,然后将所有 DataFrame 合并并写入输出数据集。解析文件的逻辑在 parse_file
函数中实现,其中使用临时文件来处理文件内容。最后,通过 write_dataframe
方法将合并后的 DataFrame 写入到指定的输出数据集中。
code authoring
, code repositories
, python
, raw files
, unstructured
如何使用Python从DOCX文件中提取内容?
此代码使用python-docx库从数据集中读取DOCX文件的内容,并将其存储在Document对象中以供进一步处理。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
from transforms.api import transform, Input, Output import docx as dx from io import BytesIO @transform( output=Output("output_dataset"), docs=Input("input_dataset"), ) def compute(ctx, docs, output): fs = docs.filesystem() doc_file = list(fs.ls(regex=r'.*\.docx'))[0] # 查找并选择第一个.docx文件 # 使用文件系统打开文件并将其内容读入BytesIO对象 with fs.open(doc_file.path, 'rb') as f: source_stream = BytesIO(f.read()) document = dx.Document(source_stream) # 使用docx库读取文档内容 source_stream.close() # 对document对象进行处理 # 这里可以添加处理document对象的代码,例如提取文本、分析内容等
code authoring
, python
, python-docx
, bytesio
, raw files
, unstructured
如何读取和处理一个包含多个CSV文件的zip文件从输入数据集并将处理后的数据写入输出数据集在PySpark中?
此代码使用PySpark读取和处理输入数据集中包含CSV的zip文件,跳过每个CSV的第一行,并将处理后的数据写入输出数据集。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
from pyspark.sql import functions as F, types as T, DataFrame from transforms.api import transform, Input, Output, configure import shutil import tempfile import zipfile import io @transform( my_output=Output("my_output_dataset"), my_input=Input("my_input_dataset") ) def compute(ctx, my_output, my_input): # 处理输入数据集中每个文件的函数 def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() # 读取并处理zip文件 with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # 跳过CSV文件的第一行 # 读取并处理CSV文件中的每一行 for line in tw: yield MyRow(*line.split(",")) # 生成MyRow对象,假设MyRow已定义 # 读取输入数据集并处理每个文件 rdd = my_input.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF() # 将处理后的数据写入输出数据集 my_output.write_dataframe(df)
@transform
: 用于声明输入和输出数据集。process_file
函数: 处理压缩文件中的CSV文件,跳过每个CSV文件的第一行,将剩余行解析为 MyRow
对象。flatMap
将每个文件的处理结果平铺到RDD中。请注意,这段代码假设存在一个名为
MyRow
的数据结构用于解析CSV数据,这部分未在代码中定义。
code authoring
,code repositories
,python
,zip
,csv
如何在数据集中解压文件?
此代码使用PySpark从输入中读取压缩文件,提取内容,并将提取的文件写入输出。它通过迭代压缩文件,将其内容读取到BytesIO流中,然后使用zipfile库将文件解压到一个临时目录。提取的文件随后被写入输出。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
# from pyspark.sql import functions as F from transforms.api import transform, Input, Output import zipfile import tempfile import os from io import BytesIO @transform( unzipped=Output(""), zipped=Input(""), ) def compute(unzipped, zipped): # 获取所有.zip文件的列表 zip_files = zipped.filesystem().files(glob="*.zip").collect() for zip_file in zip_files: # 以二进制模式打开.zip文件 with zipped.filesystem().open(zip_file["path"], 'rb') as zip_f: source_stream = BytesIO(zip_f.read()) # 读取.zip文件内容 with zipfile.ZipFile(source_stream, 'r') as zip_ref: # 创建临时目录用于解压缩文件 with tempfile.TemporaryDirectory() as temp_dir: zip_ref.extractall(temp_dir) # 解压缩文件到临时目录 for path in iterate_directories(temp_dir): # 生成输出文件名 output_file_name = path.replace(temp_dir, "") # 打开输出文件用于写入解压后的内容 with unzipped.filesystem().open(output_file_name, "w") as out_f: with open(path, 'r') as in_f: out_f.write(in_f.read()) # 将解压后的内容写入输出文件 def iterate_directories(directory): # 遍历目录下的所有文件 for root, dirs, files in os.walk(directory): for file in files: path = os.path.join(root, file) if is_leaf_file(path): # 检查是否为叶子文件 yield path def is_leaf_file(path): # 判断文件是否为普通文件且不是符号链接 return os.path.isfile(path) and not os.path.islink(path)
代码创作
, 代码仓库
, python
, 原始文件
, zip
, 解压
如何从文件数据集中创建一个zip文件?
此代码使用变换API读取源数据集中的所有Markdown文件,并创建一个包含这些文件的zip文件。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
from transforms.api import transform, Input, Output import zipfile @transform( my_output=Output(""), source_df=Input(""), ) def compute(ctx, my_output, source_df): # 收集所有匹配模式 "*.md" 的文件 files = source_df.filesystem().files(glob="*.md").collect() # 创建一个新的 ZIP 文件 with my_output.filesystem().open("foundry_code_examples.zip", 'wb') as write_zip: with zipfile.ZipFile(write_zip.name, 'w') as zip_file: # 遍历所有收集到的 Markdown 文件 for file_row in files: # 读取每一个 Markdown 文件 with source_df.filesystem().open(file_row["path"], 'rb') as markdown_file: # 将文件写入 ZIP 文件中 zip_file.write(markdown_file.name, arcname=file_row["path"]) return source_df
raw files
, zip
, python
, 代码创作
, 代码库
, 以{filetype}格式导出
如何解析表头复杂并由多行组成的Excel文件?
此代码演示了如何使用transforms-excel-parser
库解析具有复杂表头的Excel文件。它创建一个带有MultilayerMergedHeaderExtractor的TableParser,然后创建一个带有TableParser的TransformsExcelParser。最后,它使用TransformsExcelParser从输入数据集中提取Excel文件的数据,并将结果写入输出。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.MultilayerMergedHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class ComplexHeaderExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // 创建一个带有 MultilayerMergedHeaderExtractor 的 TableParser Parser tableParser = TableParser.builder() .headerExtractor(MultilayerMergedHeaderExtractor.builder() .topLeftCellName("A1") // 指定表头左上角单元格 .bottomRightCellName("D2") // 指定表头右下角单元格 .build()) .build(); // 创建一个使用 TableParser 的 TransformsExcelParser TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // 解析输入数据 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 获取解析后的数据,如果输入中没有行或发生错误,可能为空 Optional<Dataset<Row>> maybeDf = result.singleResult(); // 如果解析后的数据不为空,将其写入输出数据集 maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // 将错误信息写入错误输出 errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } }
code authoring
, code repositories
, java
, transforms-excel-parser
, excel
如何解析数据不是表格形式的Excel文件?
此代码演示了如何使用 transforms-excel-parser
库从包含跨多个工作表的表单的Excel文件中提取数据。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
package myproject.datasets; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.form.FieldSpec; import com.palantir.transforms.excel.form.FormParser; import com.palantir.transforms.excel.form.Location; import com.palantir.transforms.excel.form.cellvalue.AdjacentCellAssertion; import com.palantir.transforms.excel.form.cellvalue.CellValue; import com.palantir.transforms.excel.functions.RegexSubstringMatchingSheetSelector; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; public final class FormStyleExcel { private static final String FORM_A_KEY = "FORM_A"; private static final String FORM_B_KEY = "FORM_B"; @Compute public void myComputeFunction( @Input("<input_dataset_rid") FoundryInput myInput, @Output("<form_a_output_dataset_rid>") FoundryOutput formAOutput, @Output("<form_b_output_dataset_rid>") FoundryOutput formBOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput) { // Form A 解析器配置 Parser formAParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_A")) .addFieldSpecs(createFieldSpec("form_a_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_a_field_2", "B2")) .build(); // Form B 解析器配置 Parser formBParser = FormParser.builder() .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B")) .addFieldSpecs(createFieldSpec("form_b_field_1", "B1")) .addFieldSpecs(createFieldSpec("form_b_field_2", "B2")) .build(); // 使用 Form A 和 Form B 解析器创建 TransformsExcelParser TransformsExcelParser transformsParser = TransformsExcelParser.builder() .putKeyToParser(FORM_A_KEY, formAParser) .putKeyToParser(FORM_B_KEY, formBParser) .build(); // 解析输入数据 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 将解析后的数据写入输出数据集中 result.dataframeForKey(FORM_A_KEY) .ifPresent(df -> formAOutput.getDataFrameWriter(df).write()); result.dataframeForKey(FORM_B_KEY) .ifPresent(df -> formBOutput.getDataFrameWriter(df).write()); // 将错误信息写入错误输出数据集 errorOutput.getDataFrameWriter(result.errorDataframe()).write(); } // 辅助方法:简洁地创建带有适当断言的 FieldSpec private static FieldSpec createFieldSpec(String fieldName, String cellLocation) { return FieldSpec.of( fieldName, CellValue.builder() .addAssertions(AdjacentCellAssertion.left(1, fieldName)) .location(Location.of(cellLocation)) .build()); } }
此代码定义了一个 Java 类 FormStyleExcel
,用于从 Excel 文件中解析数据。通过使用 TransformsExcelParser
来配置和使用解析器,可以从不同的表单(Form A 和 Form B)中提取数据,并将其写入相应的输出数据集。如果在解析过程中出现错误,则错误信息将被写入一个单独的错误输出数据集中。
代码创作
, 代码库
, java
, transforms-excel-parser
, excel
如何使用Transforms Excel Parser解析简单的表格Excel文件?
此代码演示了如何使用transforms-excel-parser
库解析包含简单表格Excel文件的数据集。它创建一个带有SimpleHeaderExtractor的TableParser,然后创建一个带有TableParser的TransformsExcelParser。最后,它使用TransformsExcelParser解析输入数据集中的文件,并将提取的数据写入输出数据集。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
package myproject.datasets; import com.palantir.transforms.excel.ParseResult; import com.palantir.transforms.excel.Parser; import com.palantir.transforms.excel.TransformsExcelParser; import com.palantir.transforms.excel.table.SimpleHeaderExtractor; import com.palantir.transforms.excel.table.TableParser; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import java.util.Optional; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class SimpleTabularExcel { @Compute public void myComputeFunction( @Input("<input_dataset_rid>") FoundryInput myInput, @Output("<output_dataset_rid>") FoundryOutput myOutput, @Output("<error_output_dataset_rid>") FoundryOutput errorOutput ) { // 创建一个配置了SimpleHeaderExtractor的TableParser // 在这个例子中,文件的表头在第二行。 // 如果表头在第一行,我们就不需要指定rowsToSkip, // 因为默认值是0,实际上我们可以直接使用TableParser.builder().build()。 Parser tableParser = TableParser.builder() .headerExtractor( SimpleHeaderExtractor.builder().rowsToSkip(1).build()) .build(); // 使用TableParser创建TransformsExcelParser TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser); // 解析输入 ParseResult result = transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset()); // 获取解析后的数据,如果输入中没有行或发生错误,可能为空 Optional<Dataset<Row>> maybeDf = result.singleResult(); // 如果解析后的数据不为空,将其写入输出数据集 maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write()); // 将错误信息写入错误输出 errorOutput.getDataFrameWriter(result.errorDataframe()).write(); }
这个Java代码片段用于解析Excel文件并将其转换为数据集。主要逻辑是通过配置一个TableParser
来解析Excel文件,其中表头位于第二行。解析结果被写入输出数据集,如果存在错误,则将错误信息写入错误输出数据集。
代码创作
, 代码库
, java
, 变换-excel-解析器
, excel