注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
| 类 | 描述 |
|---|---|
Check | 封装一个期望,以便可以在数据健康中注册。 |
FileStatus | 一个collections.namedtuple,捕获关于FoundryFS文件的详细信息。 |
FileSystem(foundry_fs[, read_only]) | 用于读取和写入数据集文件的文件系统对象。 |
IncrementalTransformContext(ctx, is_incremental) | 增强了增量计算功能的TransformContext。 |
IncrementalTransformInput(tinput[, prev_txrid]) | 增强了增量计算功能的TransformInput。 |
IncrementalTransformOutput(toutput[, …]) | 增强了增量计算功能的TransformOutput。 |
Input(alias) | 变换输入的规范。 |
Output(alias[, sever_permissions]) | 变换输出的规范。 |
Pipeline() | 用于分组一组Transform对象的对象。 |
Transform(compute_func[, inputs, outputs, ...]) | 描述计算单步骤的可调用对象。 |
TransformContext(foundry_connector[, parameters]) | 可以选择注入到变换的计算函数中的上下文对象。 |
TransformInput(rid, branch, txrange, …) | 在运行时传递给Transform对象的输入对象。 |
LightweightInput(alias) | 在运行时传递给轻量级变换对象的输入对象。 |
IncrementalLightweightInput(alias) | 在运行时传递给增量轻量级变换对象的输入对象。 |
TransformOutput(rid, branch, txrid, …) | 在运行时传递给Transform对象的输出对象。 |
LightweightOutput(alias) | 在运行时传递给轻量级变换对象的输入对象。 |
Checktransforms.api.Check封装一个期望,以便可以在数据健康中注册。
expectation
name
is_incremental
on_error
description
FileStatusclass transforms.api.FileStatus
一个collections.namedtuple,捕获关于FoundryFS文件的详细信息。
创建FileStatus(path, size, modified)的新实例
count(value) → integer -- 返回值的出现次数index(value[, start[, stop]]) → integer -- 返回值的第一个索引
modified
path
size
FileSystemclass transforms.api.FileSystem(foundry_fs, read_only=False)
用于读取和写入数据集文件的文件系统对象。
files(glob=None, regex='.*', show_hidden=False, packing_heuristic=None)
DataFrame ↗ ,包含此数据集中可访问的路径。DataFrame ↗ 按文件大小分区,每个分区包含总大小最多为spark.files.maxPartitionBytes字节的文件路径,或者如果单个文件大于spark.files.maxPartitionBytes则按单个文件分区。文件的大小计算为其磁盘文件大小加上spark.files.openCostInBytes。pdf),请使用**/*.pdf。.或_开头的文件。ffd(First Fit Decreasing)或wfd(Worst Fit Decreasing)。虽然wfd产生的分布不均匀,但速度更快,因此推荐用于包含大量文件的数据集。如果未指定启发式,将自动选择一个。ls(glob=None, regex='.*', show_hidden=False)
open(_path, mode='r', kwargs)
kwargs是关键字参数。io.open() ↗.或_开头的文件。IncrementalTransformContexttransforms.api.IncrementalTransformContext(ctx, is_incremental)带有增量计算功能的TransformContext。
auth_header
fallback_branches
is_incremental
parameters
spark_session
IncrementalTransformInputtransforms.api.IncrementalTransformInput(tinput, prev_txrid=None)具有增量计算功能的TransformInput。
dataframe(mode='added')
pyspark.sql.DataFrame。filesystem(mode='added')
pandas()
branch
path
rid
IncrementalTransformOutputclass transforms.api.IncrementalTransformOutput(toutput, prev_txrid=None, mode='replace')
具有增量计算功能的TransformOutput。
abort()
dataframe(mode='current', schema=None)
ValueError ↗ - 如果使用模式‘previous’时未传递任何模式filesystem(mode='current')
NotImplementedError ↗ – 当前不支持。pandas(mode='current')
set_mode(mode)
数据写入后无法更改写入模式。
write_dataframe(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None)
org.apache.spark.sql.DataFrameWriter#option(String, String)的其他选项。write_pandas(pandas_df)
branch
path
rid
Inputclass transforms.api.Input(alias, branch, stop_propagating, stop_requiring, checks)
变换输入的规范。
Check对象。continue或fail之一。如果未指定,默认为fail。Outputclass transforms.api.Output(alias=None, sever_permissions=False, checks=None)
变换输出的规范。
Pipelineclass transforms.api.Pipeline
用于分组一组Transform对象的对象。
add_transforms(*transforms)
ValueError ↗ – 如果多个Transform对象写入相同的Output别名。discover_transforms(*modules)
Transform实例的属性(通过变换装饰器构造)都将被注册到管道。Copied!1 2 3 4 5 6 7>>> import myproject >>> p = Pipeline() >>> p.discover_transforms(myproject) # 该代码导入了一个名为 myproject 的模块。 # 然后创建了一个 Pipeline 对象实例 p。 # 接着调用 p 的 discover_transforms 方法,传入 myproject 模块。 # 这个方法可能用于在 myproject 中发现或注册一些数据转换操作。
找到的每个模块都会被导入。尽量避免在模块级别执行代码。
transforms
Transformclass transforms.api.Transform(compute_func, inputs=None, outputs=None, profile=None)
一个描述计算单步骤的可调用对象。
一个变换由若干Input规格、若干Output规格和一个计算函数组成。
使用提供的装饰器构建Transform对象是惯用的: transform(), transform_df(), 和 transform_pandas()。
注意:原始的计算函数通过Transform的__call__方法暴露。
参数
compute(ctx=None, _kwargs_)**
Input规格的字典。kwarg是关键字参数的缩写。version
select A, B from foo;应该与SQL查询 select A, B from (select * from foo);具有相同的版本。ValueError ↗ – 如果计算函数的对象哈希失败TransformContextclass transforms.api.TransformContext(foundry_connector, parameters=None)
可以选择注入到变换计算函数中的上下文对象。
auth_header
fallback_branches
parameters
spark_session
TransformInputclass transforms.api.TransformInput(rid, branch, txrange, dfreader, fsbuilder)
在运行时传递给Transform对象的输入对象。
dataframe()
filesystem()
pandas()
branch
path
rid
column_descriptions
column_typeclasses
LightweightInputclass transforms.api.LightweightInput(alias)
其目的是通过委托给Foundry Data Sidecar模仿TransformInput的API子集,同时通过支持各种数据格式进行扩展。
dataframe()
pandas()的别名。filesystem()
pandas()
arrow()
polars(lazy: Optional[bool]=False)
lazy参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path()
IncrementalLightweightInputclass transforms.api.IncrementalLightweightInput(alias)
其目的是通过委托给Foundry Data Sidecar模仿IncrementalTransformInput的API子集,同时通过支持各种数据格式进行扩展。它是LightweightInput的增量对应物。
dataframe(mode)
pandas()的别名。filesystem()
pandas()(mode)
arrow()(mode)
polars(lazy=False, mode)
lazy参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path(mode)
TransformOutputclass transforms.api.TransformOutput(rid, branch, txrid, dfreader, dfwriter, fsbuilder)
在运行时传递给Transform对象的输出对象。
abort()
dataframe()
filesystem()
pandas()
set_mode(mode)
write_dataframe(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None, column_descriptions=None, column_typeclasses=None)
bucket_count,则必须指定。bucket_cols,则必须指定。org.apache.spark.sql.DataFrameWriter#option(String, String)的额外选项。write_pandas(pandas_df)
branch
path
rid
LightweightOutputclass transforms.api.LightweightInput(alias)
其目的是通过委托给Foundry Data Sidecar模仿TransformOutput的API子集。
filesystem()
dataframe()
pandas()的别名。pandas()
arrow()
polars(lazy: Optional[bool]=False)
lazy参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path()
write_pandas(pandas_df)
write_table。write_table(df)
path写入到输出数据集中。如果给定path(无论是str还是pathlib.Path),其值必须与path_for_write_table返回的值匹配。path) – 要写入的数据框。path_for_write_table
write_table一起使用的数据集文件的路径。set_mode(mode)