注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
除了操作具有定义表模式的Foundry数据集之外,Code Workbook还支持访问数据集中的非结构化文件。这对于分析和变换非结构化数据(如图像和其他类型的媒体)、半结构化格式(如XML或JSON)、压缩格式(如GZ或ZIP文件)或R数据格式(如RDA和RDS)很有用。
您可以通过将上游数据集读取为Python变换输入
来在Python变换中读取文件。此API公开一个FileSystem
Object,该Object允许基于Foundry数据集中文件路径的文件访问,从而抽象掉底层存储。了解更多关于FileSystem
的信息。。其他信息,包括分支和RID(如变换输入文档中详细说明的),也会暴露。
使用输入助手栏或在输入选项卡中更改输入的类型。
只有导入的数据集和持久化的数据集可以作为Python变换输入读取。未保存为数据集的变换不能作为Python变换输入读取。
没有模式的数据集应自动作为变换输入读取。
例如,以下代码将读取ZIP文件中的CSV,并将CSV内容返回为dataframe。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
import tempfile import zipfile import shutil import io from pyspark.sql import Row # datasetOfZippedFiles 是一个包含单个压缩文件的数据集,该压缩文件包含3个具有相同schema的CSV文件:["id", "name"]。 def sample(datasetOfZippedFiles): df = datasetOfZippedFiles fs = df.filesystem() # 这是文件系统对象。 MyRow = Row("id", "name") # 定义行模式 def process_file(file_status): with fs.open(file_status.path, 'rb') as f: # 打开文件状态路径 with tempfile.NamedTemporaryFile() as tmp: # 创建临时文件 shutil.copyfileobj(f, tmp) # 将文件内容复制到临时文件中 tmp.flush() # 刷新临时文件 with zipfile.ZipFile(tmp) as archive: # 打开压缩文件 for filename in archive.namelist(): # 遍历压缩文件中的所有文件名 with archive.open(filename) as f2: # 打开每个文件 br = io.BufferedReader(f2) # 创建缓冲读取器 tw = io.TextIOWrapper(br) # 创建文本包装器 tw.readline() # 跳过每个CSV的第一行 for line in tw: # 逐行读取 yield MyRow(*line.split(",")) # 生成每行的Row对象 rdd = fs.files().rdd # 获取文件系统的RDD rdd = rdd.flatMap(process_file) # 使用flatMap处理每个文件 df = rdd.toDF() # 将RDD转换为DataFrame return df
此代码的主要功能是处理包含多个CSV文件的压缩文件,将其解压缩后,读取CSV文件并转换为Spark DataFrame。代码利用了PySpark的RDD和DataFrame API进行数据处理。
可以写入输出文件系统。这对于写入非表格数据格式(包括图像、PDF、文本文件等)非常有用。
调用 Transforms.get_output()
来实例化 TransformOutput。了解有关 TransformOutput API 的更多信息。
仅能在保存为数据集的节点中使用 TransformOutput 写入文件。不能在控制台中使用 TransformOutput 写入文件。
一旦实例化了 TransformOutput 并通过调用 filesystem() 或其他方法使用它,返回除 TransformOutput Object 以外的任何东西都将被忽略。
以下代码是如何写入文本文件的示例:
Copied!1 2 3 4 5 6 7 8
def write_text_file(): output = Transforms.get_output() # 获取输出对象 output_fs = output.filesystem() # 获取输出对象的文件系统 # 使用文件系统打开或创建一个名为 'my text file.txt' 的文本文件,以写入模式 ('w') 打开 with output_fs.open('my text file.txt', 'w') as f: f.write("Hello world") # 写入字符串 "Hello world" 到文件中 f.close() # 关闭文件,释放资源(尽管使用 'with' 上下文管理器会自动关闭文件,这行代码可以省略)
以下代码是如何编写数据集并指定分区和输出格式的示例。
Copied!1 2 3 4 5
def write_dataset(input_dataset): # 获取输出对象 output = Transforms.get_output() # 将输入数据集写入输出对象,按"colA"和"colB"列进行分区,以CSV格式输出 output.write_dataframe(input_dataset, partition_cols = ["colA", "colB"], output_format = 'csv')
您可以通过读取上游数据集作为R变换输入
来读取R变换中的文件。TransformInput
对象被暴露出来,它允许基于Foundry数据集中文件路径的文件访问。了解有关FileSystem
API的更多信息。
使用输入帮助栏或在输入选项卡中更改输入类型。
只有导入的数据集和持久化的数据集可以作为R变换输入读取。未保存为数据集的变换不能作为R变换输入读取。
默认情况下,没有架构的数据集应已设置为输入类型R变换输入。
使用下面的代码加载作为导入数据集中文件的RDS。RDS包含一个R data.frame。
Copied!1 2 3 4 5 6 7 8 9
RDS_reader <- function(RDS_dataset) { fs <- RDS_dataset$fileSystem() ## 文件名是 test_loading_RDS.rds path <- fs$get_path("test_loading_RDS.rds", 'r') # 获取文件路径,模式为只读 rds <- readRDS(path) # 读取 RDS 文件 return(rds) }
使用以下代码对一组压缩的CSV文件内容进行rbind
操作。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
result <- function(zip_file_with_csvs) { fs <- zip_file_with_csvs$fileSystem() ## 获取 zip 文件的远程路径(名称) zipfile_name <- fs$ls()[[1]]$path ## 获取 zip 文件的本地路径 path <- fs$get_path(zipfile_name, 'r') # 列出压缩包中的文件 zipped_files <- as.list(unzip(path, list = TRUE)$Name) # 对列表中的每个元素,返回一个数据框 list_of_data_frames <- lapply(zipped_files, function(x){read.csv(unz(path, x), header = TRUE, sep = ",")}) # 将所有的数据框绑定在一起 rbind_df <- do.call(rbind,list_of_data_frames) return(rbind_df) }
此代码定义了一个函数 result
,它接受一个包含 CSV 文件的 zip 文件对象作为参数。函数将 zip 文件中的 CSV 文件读取为数据框,并将所有数据框合并为一个数据框返回。
可以写入到输出文件系统。这对于写入非表格数据格式(包括图像、PDF、文本文件等)非常有用。
调用 new.output()
来实例化一个 TransformOutput。 了解更多关于 FileSystem
API 的信息。
只能在保存为数据集的节点中使用 TransformOutput 写入文件。无法在控制台中使用 TransformOutput 写入文件。
使用下面的代码将 R data.frame 保存为 RDS 文件。
Copied!1 2 3 4 5 6 7 8
write_rds_file <- function(r_dataframe) { # 创建新的输出对象 output <- new.output() # 获取文件系统对象 output_fs <- output$fileSystem() # 保存R数据框为RDS文件,文件名为"my_RDS_file.rds" saveRDS(r_dataframe, output_fs$get_path("my_RDS_file.rds", 'w')) }
使用以下代码将图保存为PDF。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
plot_pdf <- function() { library(ggplot2) theme_set(theme_bw()) # 预设黑白主题 data("midwest", package = "ggplot2") # 散点图 gg <- ggplot(midwest, aes(x=area, y=poptotal)) + geom_point(aes(col=state, size=popdensity)) + # 根据州进行颜色区分,并根据人口密度调整点的大小 geom_smooth(method="loess", se=F) + # 添加LOESS平滑曲线,不显示置信区间 xlim(c(0, 0.1)) + # 设置x轴范围 ylim(c(0, 500000)) + # 设置y轴范围 labs(subtitle="Area Vs Population", # 设置副标题、坐标轴标签、主标题和数据来源 y="Population", x="Area", title="Scatterplot", caption = "Source: midwest") output <- new.output() # 假设这是一个生成输出对象的函数 output_fs <- output$fileSystem() # 获取文件系统对象 pdf(output_fs$get_path("my pdf example.pdf", 'w')) # 打开一个PDF文件用于写入 plot(gg) # 绘制图形到PDF文件 }
请注意,上述代码中的new.output()
和output$fileSystem()
并不是标准的R函数,在实际使用中可能需要定义或修改。如果这些函数没有定义,代码将无法正常工作。
使用下面的代码通过连接写入一个TXT文件。
Copied!1 2 3 4 5 6
write_txt_file <- function() { output <- new.output() # 创建新的输出对象 output_fs <- output$fileSystem() # 获取文件系统对象 conn <- output_fs$open("my file.txt", 'w') # 打开文件“my file.txt”用于写入 writeLines(c("Hello", "world"), conn) # 将字符串“Hello”和“world”写入文件 }
使用下面的代码将本地路径 output.txt
的文本文件上传到远程路径 output_test.txt
。在保存的数据集中,您将看到一个名为 output_test.txt
的文件。
Copied!1 2 3 4 5 6 7 8 9
upload <- function() { output <- new.output() # 创建一个新的输出对象 output_fs <- output$fileSystem() # 获取输出对象的文件系统 fileConn<-file("output.txt") # 打开一个名为 "output.txt" 的文件连接 writeLines(c("Header 1"), fileConn) # 写入 "Header 1" 到文件中 close(fileConn) # 关闭文件连接 output_fs$upload("output.txt", "output_test.txt") # 将 "output.txt" 上传并重命名为 "output_test.txt" }
使用下面的代码以列A和B分区的方式写入Spark数据框。
Copied!1 2 3 4 5 6
write_partitioned_df <- function(spark_df) { output <- new.output() # 按照 colA 和 colB 列进行分区 output$write.spark.df(spark_df, partition_cols=list("colA", "colB")) }