注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。
将Foundry连接到Kafka,以实时从Kafka队列读取数据到Foundry流中。
功能 | 状态 |
---|---|
探索 | 🟢 普遍可用 |
流同步 | 🟢 普遍可用 |
流导出 | 🟢 普遍可用 |
key (binary) | value (binary) |
---|---|
London | {"firstName": "John", "lastName": "Doe"} |
Paris | {"firstName": "Jean", "lastName": "DuPont"} |
Kafka连接器不解析消息内容,任何类型的数据都可以同步到Foundry。所有内容在value
列下未解析地上传。使用下游流变换(例如,Pipeline Builder中的parse_json
)来解析数据。key
列将显示与消息一起记录在Kafka中的键。如果消息不包含键,则值为null
。
连接器始终使用单个消费者线程,无论源Kafka主题上分区的数量如何。
流同步旨在成为一致的、长期运行的任务。流同步的任何中断都可能根据预期结果导致中断。
目前,流同步具有以下限制:
我们建议在代理连接上运行流同步,以提高性能、带宽和可用性。
了解更多关于在Foundry中设置连接器的信息。
如果您在连接器类型页面上看不到Kafka,请联系Palantir客服支持以启用访问。
参数 | 必需? | 默认值 | 描述 |
---|---|---|---|
引导服务器 | 是 | 否 | 按HOST:PORT 格式添加Kafka代理服务器,每行一个。 |
选择凭据方法以验证您的Kafka连接:SSL,用户名/密码,Azure AD,Kerberos,或NONE。
配置的凭据必须允许以下操作:
主题
资源:
读取
用于流同步和探索写入
用于流导出SSL认证对应于标准Kafka SSL
和 SASL_SSL
协议。
要使用SSL进行认证,请完成以下配置选项:
参数 | 必需? | 默认值 | 描述 |
---|---|---|---|
端点识别算法 | 是 | HTTPS | HTTPS :验证代理主机名是否与代理的证书中的主机名匹配。NONE :禁用端点识别。 |
双向SSL启用 | 否 | 禁用 | 启用相互TLS (mTLS);需要额外配置。 |
使用SASL | 否 | 否 | 启用SASL认证。 |
如果启用相互TLS(双向SSL),添加您的私钥密码:
参数 | 必需? | 默认值 | 描述 |
---|---|---|---|
SSL密钥密码 | 是 | 否 | 解密私钥所需的密码。 |
如果启用SASL认证,请完成额外配置:
参数 | 必需? | 默认值 | 描述 |
---|---|---|---|
SASL机制 | 否 | 否 | 选择用于加密凭据的算法。 |
saslJaasConfigUsername | 是 | 否 | 用户名 |
SASL JAAS配置密码 | 是 | 否 | 密码 |
SASL客户端回调处理类 | 是 | 否 | 显示SASL客户端的默认回调处理程序。有关SASL回调处理程序的更多信息,请参阅Java SASL API文档 ↗。 |
OAuth认证使用OAuth 2.0协议。目前仅支持客户端凭据授权流程。
要使用OAuth 2.0协议进行认证,请完成以下配置选项:
参数 | 必需? | 默认值 | 描述 |
---|---|---|---|
客户端ID | 是 | 否 | 请求认证的应用程序ID |
客户端密钥 | 是 | 否 | 服务器和您的应用程序之间的共享密钥 |
词元端点URI | 是 | 否 | 授予访问/ID词元的服务器的统一资源标识符(URI) |
范围 | 否 | 否 | 通过OAuth连接到Kafka或特定Kafka主题,可能需要请求一组范围。范围是配置在认证提供程序中的任意字符串值。例如,消费者可能需要在其认证请求中请求(kafka-topics-read, kafka-topics-list)之一,以确定他们接收到的Kafka访问级别。 |
SASL扩展 | 否 | 否 | 某些Kafka服务器,例如Confluent Cloud,可能需要配置SASL扩展 ↗,这将是特定于该服务器平台的键值对。 |
Azure AD认证模式适用于Azure事件中心的Kafka接口。此模式需要Azure AD服务主体。查看Azure文档 ↗以了解如何创建服务主体并设置对事件中心的访问。
参数 | 必需? | 默认值 |
---|---|---|
租户ID | 是 | 否 |
客户端ID | 是 | 否 |
客户端密钥 | 是 | 否 |
Azure事件中心的Kafka接口也可以通过SAS词元 ↗访问。要使用SAS词元进行认证,请选择用户名/密码认证,用户名为$ConnectionString
(无引号),密码为您的事件中心连接字符串。
对应于Kafka的标准PLAINTEXT
协议。
我们强烈不建议在没有认证或SSL的情况下配置连接器,因为这将在线连接器和Kafka代理之间传输未加密的数据。仅在安全网络中使用此配置。
连接器必须能够访问Kafka代理的主机。如果使用直接连接,请为所有引导服务器主机创建DNS出口策略。
您可能需要为SSL和TLS配置额外的客户端或服务器证书和私钥。
SSL连接验证服务器证书。通常,SSL验证通过证书链进行;默认情况下,代理和直接连接运行时信任大多数标准证书链。然而,如果您要连接的服务器具有自签名证书,或者主机名验证拦截了连接,则连接器必须信任证书。请联系您的Kafka管理员以获取正确的证书。
了解更多关于在数据连接中使用证书的信息。
您的Kafka集群可能要求服务器和客户端都通过mTLS进行认证。要启用mTLS,您必须配置以下内容:
根据连接器运行类型,按照以下步骤配置客户端私钥。
如果通过代理连接,请按照如何添加私钥的说明进行操作。
如果直接连接,请在连接器配置页面的配置客户端证书和私钥部分上传您的私钥。在出现的弹出窗口中使用别名kafka
,然后添加私钥和客户端证书。
了解如何在设置流同步教程中设置与Kafka的同步。
Kafka架构注册作为一个集中存储系统,维护所有架构的版本历史。注册提供对Avro
、Protobuf
和JSON
架构的兼容性,并使用SerDes(序列化/反序列化器)促进架构格式和序列化数据之间的转换。
为了有效利用Kafka架构注册,有必要为相关的Kafka
主题注册架构,并将架构注册URL
附加到来源配置中。此调整使连接器能够将原始字节转换为相应的数据类型。
例如,标准提取通常将原始字节从Kafka提取到Foundry,如下所示:
然而,通过配置架构注册,连接器可以识别字节的底层架构,并将其转换为一流的Foundry类型,如下所示:
此功能可以通过消除繁琐的类型转换显著简化下游管道。此外,鉴于架构注册提供了集中式的架构管理和兼容性检查,此功能提升了数据一致性保证。
连接器支持通过数据连接将流导出到外部Kafka集群。
要导出到Kafka,请首先启用导出您的Kafka连接器。然后,创建一个新导出。
选项 | 必需? | 默认值 | 描述 |
---|---|---|---|
Topic | 是 | 不适用 | 您想要导出的Kafka主题。 |
Linger milliseconds | 是 | 0 | 在将记录发送到Kafka之前等待的毫秒数。在等待时间内累积的记录将在导出时一起批处理。此设置默认为0,表示记录将立即发送。然而,此默认值可能会导致对您的Kafka实例的更多请求。 |
Key column | 否 | 未定义 | 在发布记录到Kafka时,您希望用作键的Foundry流中的列。Null键不受支持;确保所选列在导出的流中的所有记录中都有值。 |
Value column | 否 | 未定义 | 您希望导出的Foundry流中的列。如果未指定,行中的所有字段将被序列化为字节并在消息体中导出到Kafka。 |
Enable Base64 Decode | 否 | 禁用 | Foundry流中的二进制数据在内部存储时被Base64编码。当启用时,此标志将导致二进制数据在导出前被解码。仅当同时指定Key column 和Value column 时,才可以启用此功能。 |
我们不建议通过导出任务导出到Kafka。如果可能,现有的导出任务应迁移到使用我们推荐的导出功能。以下文档仅用于历史参考。
要开始使用导出任务导出数据,请导航到包含您想要导出的Kafka连接器的项目文件夹。右键选择连接器名称,然后选择创建数据连接任务
。
在数据连接视图的左侧面板中:
来源
名称与您要使用的Kafka连接器匹配。dataset
的输入
,类型为流数据集
。输入数据集是正在导出的Foundry数据集。dataset
的输出
,类型为流导出
。输出数据集用于运行、调度和监控任务。在创建YAML时使用以下选项:
选项 | 必需? | 默认值 | 描述 |
---|---|---|---|
maxParallelism | 是 | 否 | 用于导出数据的最大允许并行线程数。实际线程数由输入Foundry流上的分区数决定(如果低于maxParallelism )。 |
topic | 是 | 否 | 数据推送到的主题的名称。 |
clientId | 是 | 用于导出任务的标识符。该标识符映射到Kafka client.id 。有关更多信息,请查看Kafka文档 ↗。 | |
batchOptions | 是 | 否 | 请参阅下面的batchOptions 配置。 |
keyColumn | 否 | 否 | 输入流数据集中某列的名称。该列中的值用作导出消息中的键。不包括此属性将导出null 值作为键。 |
valueColumn | 否 | 否 | 输入流数据集中某列的名称。该列中的值用作导出消息中的值。不包括此属性将导出所有列(作为字符串化的JSON对象)在value 字段下。 |
enableIdempotence | 否 | true | 有关更多信息,请查看Kafka文档 ↗。 |
useDirectReaders | 是 | 否 | 始终设置为false 。根据示例进行配置。 |
transforms | 是 | 否 | 请参阅下面的示例配置。 |
使用以下选项配置batchOptions
:
选项 | 必需? | 默认值 | 描述 |
---|---|---|---|
maxMilliseconds | 是 | 否 | 在将可用行写入输出主题之前等待的最大持续时间(以毫秒为单位)。降低此值以减少延迟,增加此值以减少网络开销(请求数)。除非批处理首先达到maxRecords 限制,否则使用此值。 |
maxRecords | 是 | 否 | 在写入输出主题之前缓冲的最大消息数。降低此值以减少延迟,增加此值以减少网络开销(请求数)。除非批处理首先达到maxMilliseconds 限制,否则使用此值。 |
以下是一个导出任务配置示例:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
type: streaming-kafka-export-task # 任务类型:流式Kafka导出任务 config: maxParallelism: 1 # 最大并行度:1 topic: test-topic # Kafka主题:test-topic clientId: client-id # 客户端ID:client-id keyColumn: key # 键列:key valueColumn: value # 值列:value batchOptions: maxMilliseconds: 5000 # 最大毫秒数:5000 maxRecords: 1000 # 最大记录数:1000 transforms: transformType: test # 转换类型:test userCodeMavenCoords: [] # 用户代码Maven坐标:空列表 useDirectReaders: false # 是否使用直接读取器:否
在配置导出任务后,选择右上角的保存。