注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

Kafka

将Foundry连接到Kafka,以实时从Kafka队列读取数据到Foundry流中。

支持的功能

功能状态
探索🟢 普遍可用
流同步🟢 普遍可用
流导出🟢 普遍可用

数据模型

key (binary)value (binary)
London{"firstName": "John", "lastName": "Doe"}
Paris{"firstName": "Jean", "lastName": "DuPont"}

Kafka连接器不解析消息内容,任何类型的数据都可以同步到Foundry。所有内容在value列下未解析地上传。使用下游流变换(例如,Pipeline Builder中的parse_json)来解析数据。key列将显示与消息一起记录在Kafka中的键。如果消息不包含键,则值为null

性能和限制

连接器始终使用单个消费者线程,无论源Kafka主题上分区的数量如何。

流同步旨在成为一致的、长期运行的任务。流同步的任何中断都可能根据预期结果导致中断。

目前,流同步具有以下限制:

  • 从代理连接的任务在维护窗口期间重启(通常每周一次)以获取升级。预期停机时间少于五分钟。
  • 从直接连接的任务至少每48小时重启一次。预期停机时间为个位数分钟(假设资源可用性允许任务立即重启)。

我们建议在代理连接上运行流同步,以提高性能、带宽和可用性。

设置

  1. 打开数据连接应用程序,并在屏幕右上角选择**+ 新建来源**。
  2. 从可用的连接器类型中选择Kafka
  3. 选择使用直接连接通过互联网连接,或者通过中介代理连接。
    • 我们建议每个来源通过两个代理进行连接,以成功设置您的Kafka连接器并减少停机时间。确保代理没有重叠的维护窗口。
  4. 根据以下部分中的信息,按照附加配置提示继续设置连接器。

了解更多关于在Foundry中设置连接器的信息。

如果您在连接器类型页面上看不到Kafka,请联系Palantir客服支持以启用访问。

引导服务器

参数必需?默认值描述
引导服务器HOST:PORT格式添加Kafka代理服务器,每行一个。

身份验证

选择凭据方法以验证您的Kafka连接:SSL,用户名/密码,Azure AD,Kerberos,或NONE。

配置的凭据必须允许以下操作:

  • 主题资源:
    • 读取用于流同步和探索
    • 写入用于流导出

SSL

SSL认证对应于标准Kafka SSLSASL_SSL 协议。

要使用SSL进行认证,请完成以下配置选项:

参数必需?默认值描述
端点识别算法HTTPSHTTPS:验证代理主机名是否与代理的证书中的主机名匹配。
NONE:禁用端点识别。
双向SSL启用禁用启用相互TLS (mTLS);需要额外配置。
使用SASL启用SASL认证。

如果启用相互TLS(双向SSL),添加您的私钥密码

参数必需?默认值描述
SSL密钥密码解密私钥所需的密码。

如果启用SASL认证,请完成额外配置:

参数必需?默认值描述
SASL机制选择用于加密凭据的算法。
saslJaasConfigUsername用户名
SASL JAAS配置密码密码
SASL客户端回调处理类显示SASL客户端的默认回调处理程序。有关SASL回调处理程序的更多信息,请参阅Java SASL API文档 ↗

OAuth 2.0

OAuth认证使用OAuth 2.0协议。目前仅支持客户端凭据授权流程。

要使用OAuth 2.0协议进行认证,请完成以下配置选项:

参数必需?默认值描述
客户端ID请求认证的应用程序ID
客户端密钥服务器和您的应用程序之间的共享密钥
词元端点URI授予访问/ID词元的服务器的统一资源标识符(URI)
范围通过OAuth连接到Kafka或特定Kafka主题,可能需要请求一组范围。范围是配置在认证提供程序中的任意字符串值。例如,消费者可能需要在其认证请求中请求(kafka-topics-read, kafka-topics-list)之一,以确定他们接收到的Kafka访问级别。
SASL扩展某些Kafka服务器,例如Confluent Cloud,可能需要配置SASL扩展 ↗,这将是特定于该服务器平台的键值对。

Azure AD

Azure AD认证模式适用于Azure事件中心的Kafka接口。此模式需要Azure AD服务主体。查看Azure文档 ↗以了解如何创建服务主体并设置对事件中心的访问。

参数必需?默认值
租户ID
客户端ID
客户端密钥

Azure事件中心的Kafka接口也可以通过SAS词元 ↗访问。要使用SAS词元进行认证,请选择用户名/密码认证,用户名为$ConnectionString(无引号),密码为您的事件中心连接字符串。

对应于Kafka的标准PLAINTEXT协议。

我们强烈不建议在没有认证或SSL的情况下配置连接器,因为这将在线连接器和Kafka代理之间传输未加密的数据。仅在安全网络中使用此配置。

网络

连接器必须能够访问Kafka代理的主机。如果使用直接连接,请为所有引导服务器主机创建DNS出口策略。

证书和私钥

您可能需要为SSL和TLS配置额外的客户端或服务器证书和私钥。

SSL

SSL连接验证服务器证书。通常,SSL验证通过证书链进行;默认情况下,代理和直接连接运行时信任大多数标准证书链。然而,如果您要连接的服务器具有自签名证书,或者主机名验证拦截了连接,则连接器必须信任证书。请联系您的Kafka管理员以获取正确的证书。

了解更多关于在数据连接中使用证书的信息。

相互TLS (mTLS)

您的Kafka集群可能要求服务器和客户端都通过mTLS进行认证。要启用mTLS,您必须配置以下内容:

根据连接器运行类型,按照以下步骤配置客户端私钥。

为代理配置客户端私钥

如果通过代理连接,请按照如何添加私钥的说明进行操作。

为直接连接配置客户端私钥

如果直接连接,请在连接器配置页面的配置客户端证书和私钥部分上传您的私钥。在出现的弹出窗口中使用别名kafka,然后添加私钥和客户端证书。

显示配置客户端证书和私钥选项的界面

用户可以输入别名、私钥和客户端证书的弹出窗口

从Kafka同步数据

了解如何在设置流同步教程中设置与Kafka的同步。

架构注册集成

Kafka架构注册作为一个集中存储系统,维护所有架构的版本历史。注册提供对AvroProtobufJSON架构的兼容性,并使用SerDes(序列化/反序列化器)促进架构格式和序列化数据之间的转换。

为了有效利用Kafka架构注册,有必要为相关的Kafka主题注册架构,并将架构注册URL附加到来源配置中。此调整使连接器能够将原始字节转换为相应的数据类型。

例如,标准提取通常将原始字节从Kafka提取到Foundry,如下所示:

标准二进制Kafka提取。

然而,通过配置架构注册,连接器可以识别字节的底层架构,并将其转换为一流的Foundry类型,如下所示:

Avro Kafka提取。

此功能可以通过消除繁琐的类型转换显著简化下游管道。此外,鉴于架构注册提供了集中式的架构管理和兼容性检查,此功能提升了数据一致性保证。

导出数据到Kafka

连接器支持通过数据连接将流导出到外部Kafka集群。

要导出到Kafka,请首先启用导出您的Kafka连接器。然后,创建一个新导出

导出配置选项

选项必需?默认值描述
Topic不适用您想要导出的Kafka主题。
Linger milliseconds0在将记录发送到Kafka之前等待的毫秒数。在等待时间内累积的记录将在导出时一起批处理。此设置默认为0,表示记录将立即发送。然而,此默认值可能会导致对您的Kafka实例的更多请求。
Key column未定义在发布记录到Kafka时,您希望用作的Foundry流中的列。Null键不受支持;确保所选列在导出的流中的所有记录中都有值。
Value column未定义您希望导出的Foundry流中的列。如果未指定,行中的所有字段将被序列化为字节并在消息体中导出到Kafka。
Enable Base64 Decode禁用Foundry流中的二进制数据在内部存储时被Base64编码。当启用时,此标志将导致二进制数据在导出前被解码。仅当同时指定Key columnValue column时,才可以启用此功能。

导出任务配置(遗留)

我们不建议通过导出任务导出到Kafka。如果可能,现有的导出任务应迁移到使用我们推荐的导出功能。以下文档仅用于历史参考。

要开始使用导出任务导出数据,请导航到包含您想要导出的Kafka连接器的项目文件夹。右键选择连接器名称,然后选择创建数据连接任务

在数据连接视图的左侧面板中:

  1. 验证来源名称与您要使用的Kafka连接器匹配。
  2. 添加一个名为dataset输入,类型为流数据集输入数据集是正在导出的Foundry数据集。
  3. 添加一个名为dataset输出,类型为流导出输出数据集用于运行、调度和监控任务。
  4. 最后,在文本字段中添加一个YAML块以定义任务配置。

在创建YAML时使用以下选项:

选项必需?默认值描述
maxParallelism用于导出数据的最大允许并行线程数。实际线程数由输入Foundry流上的分区数决定(如果低于maxParallelism)。
topic数据推送到的主题的名称。
clientId用于导出任务的标识符。该标识符映射到Kafka client.id。有关更多信息,请查看Kafka文档 ↗
batchOptions请参阅下面的batchOptions配置。
keyColumn输入流数据集中某列的名称。该列中的值用作导出消息中的键。不包括此属性将导出null值作为键。
valueColumn输入流数据集中某列的名称。该列中的值用作导出消息中的值。不包括此属性将导出所有列(作为字符串化的JSON对象)在value字段下。
enableIdempotencetrue有关更多信息,请查看Kafka文档 ↗
useDirectReaders始终设置为false。根据示例进行配置。
transforms请参阅下面的示例配置。

使用以下选项配置batchOptions

选项必需?默认值描述
maxMilliseconds在将可用行写入输出主题之前等待的最大持续时间(以毫秒为单位)。降低此值以减少延迟,增加此值以减少网络开销(请求数)。除非批处理首先达到maxRecords限制,否则使用此值。
maxRecords在写入输出主题之前缓冲的最大消息数。降低此值以减少延迟,增加此值以减少网络开销(请求数)。除非批处理首先达到maxMilliseconds限制,否则使用此值。

以下是一个导出任务配置示例:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type: streaming-kafka-export-task # 任务类型:流式Kafka导出任务 config: maxParallelism: 1 # 最大并行度:1 topic: test-topic # Kafka主题:test-topic clientId: client-id # 客户端ID:client-id keyColumn: key # 键列:key valueColumn: value # 值列:value batchOptions: maxMilliseconds: 5000 # 最大毫秒数:5000 maxRecords: 1000 # 最大记录数:1000 transforms: transformType: test # 转换类型:test userCodeMavenCoords: [] # 用户代码Maven坐标:空列表 useDirectReaders: false # 是否使用直接读取器:否

在配置导出任务后,选择右上角的保存