注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
类 | 描述 |
---|---|
Check | 封装一个期望,以便可以在数据健康中注册。 |
FileStatus | 一个collections.namedtuple,捕获关于FoundryFS文件的详细信息。 |
FileSystem(foundry_fs[, read_only]) | 用于读取和写入数据集文件的文件系统对象。 |
IncrementalTransformContext (ctx, is_incremental) | 增强了增量计算功能的TransformContext。 |
IncrementalTransformInput (tinput[, prev_txrid]) | 增强了增量计算功能的TransformInput。 |
IncrementalTransformOutput (toutput[, …]) | 增强了增量计算功能的TransformOutput。 |
Input (alias) | 变换输入的规范。 |
Output (alias[, sever_permissions]) | 变换输出的规范。 |
Pipeline () | 用于分组一组Transform对象的对象。 |
Transform (compute_func[, inputs, outputs, ...]) | 描述计算单步骤的可调用对象。 |
TransformContext (foundry_connector[, parameters]) | 可以选择注入到变换的计算函数中的上下文对象。 |
TransformInput (rid, branch, txrange, …) | 在运行时传递给Transform对象的输入对象。 |
LightweightInput (alias) | 在运行时传递给轻量级变换对象的输入对象。 |
IncrementalLightweightInput (alias) | 在运行时传递给增量轻量级变换对象的输入对象。 |
TransformOutput (rid, branch, txrid, …) | 在运行时传递给Transform对象的输出对象。 |
LightweightOutput (alias) | 在运行时传递给轻量级变换对象的输入对象。 |
Check
transforms.api.Check
封装一个期望,以便可以在数据健康中注册。
expectation
name
is_incremental
on_error
description
FileStatus
class transforms.api.FileStatus
一个collections.namedtuple
,捕获关于FoundryFS文件的详细信息。
创建FileStatus(path, size, modified)的新实例
count
(value) → integer -- 返回值的出现次数index
(value[, start[, stop]]) → integer -- 返回值的第一个索引
modified
path
size
FileSystem
class transforms.api.FileSystem
(foundry_fs, read_only=False)
用于读取和写入数据集文件的文件系统对象。
files
(glob=None, regex='.*', show_hidden=False, packing_heuristic=None)
DataFrame
↗ ,包含此数据集中可访问的路径。DataFrame
↗ 按文件大小分区,每个分区包含总大小最多为spark.files.maxPartitionBytes
字节的文件路径,或者如果单个文件大于spark.files.maxPartitionBytes
则按单个文件分区。文件的大小计算为其磁盘文件大小加上spark.files.openCostInBytes
。pdf
),请使用**/*.pdf
。.
或_
开头的文件。ffd
(First Fit Decreasing)或wfd
(Worst Fit Decreasing)。虽然wfd
产生的分布不均匀,但速度更快,因此推荐用于包含大量文件的数据集。如果未指定启发式,将自动选择一个。ls
(glob=None, regex='.*', show_hidden=False)
open
(_path, mode='r', kwargs)
kwargs
是关键字参数。io.open()
↗.
或_
开头的文件。IncrementalTransformContext
transforms.api.IncrementalTransformContext
(ctx, is_incremental)带有增量计算功能的TransformContext。
auth_header
fallback_branches
is_incremental
parameters
spark_session
IncrementalTransformInput
transforms.api.IncrementalTransformInput
(tinput, prev_txrid=None)具有增量计算功能的TransformInput。
dataframe
(mode='added')
pyspark.sql.DataFrame
。filesystem
(mode='added')
pandas()
branch
path
rid
IncrementalTransformOutput
class transforms.api.IncrementalTransformOutput
(toutput, prev_txrid=None, mode='replace')
具有增量计算功能的TransformOutput。
abort()
dataframe
(mode='current', schema=None)
ValueError
↗ - 如果使用模式‘previous’时未传递任何模式filesystem
(mode='current')
NotImplementedError
↗ – 当前不支持。pandas
(mode='current')
set_mode
(mode)
数据写入后无法更改写入模式。
write_dataframe
(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None)
org.apache.spark.sql.DataFrameWriter#option(String, String)
的其他选项。write_pandas
(pandas_df)
branch
path
rid
Input
class transforms.api.Input
(alias, branch, stop_propagating, stop_requiring, checks)
变换输入的规范。
Check
对象。continue
或fail
之一。如果未指定,默认为fail
。Output
class transforms.api.Output
(alias=None, sever_permissions=False, checks=None)
变换输出的规范。
Pipeline
class transforms.api.Pipeline
用于分组一组Transform对象的对象。
add_transforms
(*transforms)
ValueError
↗ – 如果多个Transform
对象写入相同的Output
别名。discover_transforms
(*modules)
Transform
实例的属性(通过变换装饰器构造)都将被注册到管道。Copied!1 2 3 4 5 6 7
>>> import myproject >>> p = Pipeline() >>> p.discover_transforms(myproject) # 该代码导入了一个名为 myproject 的模块。 # 然后创建了一个 Pipeline 对象实例 p。 # 接着调用 p 的 discover_transforms 方法,传入 myproject 模块。 # 这个方法可能用于在 myproject 中发现或注册一些数据转换操作。
找到的每个模块都会被导入。尽量避免在模块级别执行代码。
transforms
Transform
class transforms.api.Transform
(compute_func, inputs=None, outputs=None, profile=None)
一个描述计算单步骤的可调用对象。
一个变换由若干Input
规格、若干Output
规格和一个计算函数组成。
使用提供的装饰器构建Transform对象是惯用的: transform()
, transform_df()
, 和 transform_pandas()
。
注意:原始的计算函数通过Transform的__call__
方法暴露。
参数
compute
(ctx=None, _kwargs_)**
Input
规格的字典。kwarg是关键字参数的缩写。version
select A, B from foo;
应该与SQL查询 select A, B from (select * from foo);
具有相同的版本。ValueError
↗ – 如果计算函数的对象哈希失败TransformContext
class transforms.api.TransformContext
(foundry_connector, parameters=None)
可以选择注入到变换计算函数中的上下文对象。
auth_header
fallback_branches
parameters
spark_session
TransformInput
class transforms.api.TransformInput
(rid, branch, txrange, dfreader, fsbuilder)
在运行时传递给Transform对象的输入对象。
dataframe()
filesystem()
pandas()
branch
path
rid
column_descriptions
column_typeclasses
LightweightInput
class transforms.api.LightweightInput
(alias)
其目的是通过委托给Foundry Data Sidecar模仿TransformInput
的API子集,同时通过支持各种数据格式进行扩展。
dataframe()
pandas()
的别名。filesystem()
pandas()
arrow()
polars(lazy: Optional[bool]=False)
lazy
参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path()
IncrementalLightweightInput
class transforms.api.IncrementalLightweightInput
(alias)
其目的是通过委托给Foundry Data Sidecar模仿IncrementalTransformInput
的API子集,同时通过支持各种数据格式进行扩展。它是LightweightInput
的增量对应物。
dataframe
(mode)
pandas()
的别名。filesystem()
pandas()
(mode)
arrow()
(mode)
polars
(lazy=False, mode)
lazy
参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path
(mode)
TransformOutput
class transforms.api.TransformOutput
(rid, branch, txrid, dfreader, dfwriter, fsbuilder)
在运行时传递给Transform对象的输出对象。
abort()
dataframe()
filesystem()
pandas()
set_mode
(mode)
write_dataframe
(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None, column_descriptions=None, column_typeclasses=None)
bucket_count
,则必须指定。bucket_cols
,则必须指定。org.apache.spark.sql.DataFrameWriter#option(String, String)
的额外选项。write_pandas
(pandas_df)
branch
path
rid
LightweightOutput
class transforms.api.LightweightInput
(alias)
其目的是通过委托给Foundry Data Sidecar模仿TransformOutput
的API子集。
filesystem()
dataframe()
pandas()
的别名。pandas()
arrow()
polars(lazy: Optional[bool]=False)
lazy
参数的值返回一个polars.DataFrame ↗或polars.LazyFrame ↗。path()
write_pandas
(pandas_df)
write_table
。write_table
(df)
path
写入到输出数据集中。如果给定path
(无论是str
还是pathlib.Path
),其值必须与path_for_write_table
返回的值匹配。path
) – 要写入的数据框。path_for_write_table
write_table
一起使用的数据集文件的路径。set_mode
(mode)