注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

使用Pipeline Builder创建流式管道

在本教程中,我们将使用Foundry Streaming和Pipeline Builder创建一个简单的管道,其输出为一个包含传感器温度信息的数据集。您将学习如何在Foundry中创建一个流,将记录推送到该流中,并在Pipeline Builder中变换它们。

第1部分:初始设置

首先,我们需要创建一个新的流。

  1. 登录Foundry后,导航到Foundry中的一个项目,在右上角选择**+ New**,然后选择Stream

流创建下拉菜单的截图

  1. 接下来,您需要定义您的流。在本指南中,我们将创建一个简单的单分区流,并手动将记录推送到其中。

定义页面上,选择Normal作为吞吐量,并定义一个基本的模式为:sensor_id: 字符串, temperature: Double

流定义页面的截图

  1. 选择创建流。这将带您进入连接页面,您可以在此指定如何连接到流数据。

第2部分:将记录推送到流中

我们现在准备连接我们的流。在此时,我们可以使用一个数据源来设置一个流数据摄取任务。在本教程中,我们将手动使用Curl将记录推送到流中。

  1. 首先,在通过API连接部分下选择**Curl (Bash)**来为您的流设置身份验证。我们将使用个人词元来提交记录。

流连接页面的截图

  1. 选择使用个人词元测试,并按照屏幕提示生成一个短时效个人词元。

    个人词元不应用于生产管道。生产管道应使用OAuth词元工作流

流连接认证页面的截图

  1. 将生成的词元粘贴到文本框中,然后点击下一步
  2. 复制Curl命令。在您的计算机上打开一个可以执行Bash的终端,并粘贴该命令。在终端中运行该命令。

使用curl推送流的截图

几秒钟内,您将在页面上的流只读器中看到一个记录出现:

流查看记录选项卡的截图

我们现在已经实时摄取了流数据。现在让我们变换这些数据。

第3部分:变换流

  1. 选择开始管道化按钮以开始在Pipeline Builder中编写一个基本的流式变换。

流查看记录选项卡的截图

  1. 创建新管道模式窗口中,选择流式管道类型,然后点击创建管道

创建builder流式管道的截图

这将为输入流创建一个管道,并在图上显示。

选择输入流节点将显示数据的预览。请注意,预览在流的冷存储视图上运行;流中的记录在出现之前会有延迟。

builder图中输入流的截图

  1. 点击图上的输入流节点,并选择变换操作(输入节点旁边的蓝色T图标)。

    这将打开一个列表,其中显示了当前支持的所有基于流中列的输入类型的变换。对于本教程,我们将所有sensor_ids转换为大写,去除其上的空白,并筛选出温度超过三度的记录。

builder流变换下拉菜单的截图

  1. 选择大写变换,选择sensor_id列,然后点击应用

builder流大写变换的截图

  1. 然后,搜索修剪空白变换并选择它。再次选择sensor_id列,然后点击应用

builder修剪空白变换的截图

  1. 对于最后的变换,首先搜索筛选变换并选择保留行。然后,选择temperature列,将筛选设置为大于3,并选择应用

builder筛选变换的截图

  1. 点击屏幕右上角的应用所有更改。然后,选择返回图形以返回到您的管道。

builder图中有变换的截图

  1. 选择我们刚创建的变换路径节点,然后点击新数据集

builder图中创建新输出的截图

  1. 在应用程序的右上角,首先点击保存以应用管道的所有新更改。然后,点击部署部署管道

如果您保存更改但未部署,您的管道逻辑将不会更新为最新更改。您必须部署管道以捕获变换逻辑的更改。

builder图部署下拉菜单的截图

  1. 选择您刚创建的输出流节点,然后点击图底部数据预览部分上方的流名称。

builder图中已部署输出的截图

这将带您进入变换输出流的流预览页面。

流集群启动大约需要一分钟,因此您可能不会立即看到记录。然而,一旦运行,集群将实时处理所有新记录。

输出流的截图

接下来的步骤

现在您已经知道如何创建一个简单的流式管道,学习更多关于管理流的方法,探索如何调试失败的流。对于更高级的变换功能,请了解更多关于Pipeline Builder的信息。