代码示例Graph and tree structured datasets变换

注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

变换

Python

扁平化层次树数据

如何将层次树数据结构扁平化为具有父子关系的平面表?

此代码使用PySpark将层次树数据结构变换为具有父子关系的平面表。它创建一个函数以提取每一级的对象,并为节点和父节点生成唯一的主键。输出数据框包含用于node_idnode_descriptionnode_levelparent_idparent_levelparents_pathnode_pkparent_pk的列。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 from transforms.api import transform_df, Input, Output, transform from pyspark.sql import functions as F # 常量 COL_ORDER = ["level1", "level2", "level3", "level4"] COLS_DESCRIPTION = { "node_id": "节点的标识符,非唯一", "node_description": "节点的可读标识符,非唯一", "node_level": "节点的层级", "parent_id": "节点父级的标识符,非唯一", "parent_level": "节点父级的可读标识符,非唯一", "parents_path": "父级标识符数组,从最高到最接近的父级。", "node_pk": "唯一。", "parent_pk": "唯一。" } ''' 将如下格式的数据: level1 | level2 | level3 | level4 | some_value root | folder1 | folder2 | file_name | file_content 转换为: node_id | node_level | parent_id | parent_level | value root | level1 | null | null | null folder1 | level2 | root | level1 | null folder2 | level3 | folder1 | level2 | null file_name | level4 | folder2 | level3 | file_content ''' def flatten_tree_data(tree_df, out): tree_df = tree_df.dataframe() # 定义一个函数来提取每一层的对象。 def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2", parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"): # 筛选出所需的列。 # 注意: parent_id_col 应包含在 parent_ids_cols 中 # 使用集合去除可能的重复项,如果id列和描述列相同 columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols])) out_df = df.select(columns_to_keep) # 使用 DISTINCT 去除重复项 out_df = out_df.distinct() # 存储特定节点的值 out_df = out_df.withColumn("node_id", F.col(node_id_col)) out_df = out_df.withColumn("node_level", F.lit(node_level)) out_df = out_df.withColumn("node_description", F.col(node_description_col)) # 处理没有父级的顶级节点 is_top_node = parent_id_col is None and parent_level is None if not is_top_node: # 存储父级的值 out_df = out_df.withColumn("parent_id", F.col(parent_id_col)) out_df = out_df.withColumn("parent_level", F.lit(parent_level)) else: # TODO: 移除逻辑以支持 allowMissingColumns=True / Spark 3 特性 out_df = out_df.withColumn("parent_id", F.lit(None)) out_df = out_df.withColumn("parent_level", F.lit(None)) # 将父级标识符连接起来得到“路径” out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols)) # 在生成主键前清理数据 out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path") # 主键用于“自联接” # 为节点生成主键 pk_cols = ["node_level", "node_id"] out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols)) # 为父级生成主键 pk_cols = ["parent_level", "parent_id"] out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols)) # 生成标题列 title_cols = ["node_level", "node_description", "node_id"] out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols)) return out_df out_df = create_object(tree_df, "level4", "level4", "level4", ["level1", "level2", "level3", "level4"], "level3", "level3") tmp_df = create_object(tree_df, "level3", "level3", "level3", ["level1", "level2", "level3"], "level2", "level2") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level2", "level2", "level2", ["level1", "level2"], "level1", "level1") out_df = out_df.unionByName(tmp_df) tmp_df = create_object(tree_df, "level1", "level1", "level1", [], None, None) out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
  • 提交日期: 2024-03-26
  • 标签: 代码库, 代码创作, python, ,

从图数据集中提取祖先和后代

如何使用PySpark和NetworkX从图数据集中提取祖先和后代?

此代码使用PySpark和NetworkX来准备图数据集,创建有向图,并提取图中每个节点的祖先和后代。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F, types as T import networkx as nx GRAPH_SCHEMA = T.StructType([ T.StructField("node_id", T.StringType()), T.StructField("descendants", T.ArrayType(T.StringType())), T.StructField("ancestors", T.ArrayType(T.StringType())), ]) # 第一步:准备数据集 @transform_df( Output("prepared_graph_output"), graph_structured_dataset=Input("original_dataset_input") ) def prepare_graph(graph_structured_dataset): vertices = get_vertices(graph_structured_dataset) edges = get_edges(graph_structured_dataset) df = vertices.unionByName(edges) return df def get_vertices(df): df = ( df .select( "node_id", # 节点的ID F.lit(None).cast(T.StringType()).alias("child"), # 一个空的“child”列,这样输出可以与边合并 F.lit("vertex").alias("type"), # 此行的类型(代表一个顶点) F.col("_partition_column"), # 节点可以被分区的属性,以便并行运行计算 ) .dropDuplicates(["node_id"]) ) return df def get_edges(df): df = ( df .filter(F.col("parent_node_id").isNotNull()) .select( F.col("parent_node_id").alias("node_id"), # 节点的ID F.col("node_id").alias("child_id"), # 对此节点的子节点的引用 F.lit("edge").alias("type"), # 此行的类型(代表一条边) F.col("_partition_column"), # 节点可以被分区的属性,以便并行运行计算 ) .dropDuplicates(["node_id", "child_id"]) ) return df # 第二步:使用networkx创建图并提取所需的属性 @transform_df( Output("extracted_graph_properties"), prepared_graph=Input("prepared_graph_output"), ) def extract_graph_properties(prepared_graph): out = ( prepared_graph .groupby("_partition_column") .applyInPandas( myNetworkxUserDefinedFunction, schema=GRAPH_SCHEMA ) ) out = out.withColumn("ancestors", F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors")) ) return out def myNetworkxUserDefinedFunction(pandas_dataframe): vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"] edges = pandas_dataframe[pandas_dataframe["type"] == "edge"] df = vertices g = nx.DiGraph() g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False)) def get_descendants(source): if not (edges['node_id'] == source).any(): return None descendents = list(nx.bfs_tree(g, source)) return descendents[1:] df["descendants"] = df["node_id"].apply(get_descendants) def get_ancestors(source): path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")] return path[1:] df["ancestors"] = df["node_id"].apply(get_ancestors) return df[["node_id", "ancestors", "descendants"]]

在以上代码中,主要过程包括准备数据集、定义顶点和边、使用 networkx 库构建有向图并提取节点的祖先和子孙节点。每个函数的中文注释说明了其功能和实现细节。

  • 提交日期: 2024-03-20
  • 标签: code authoring, code repositories, python, tree, graph, networkx