Apache Kafka连接器
此连接器提供对Apache Kafka服务的事件流的访问。
Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。
请为您的用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08
(部分flink-connector-kafka
)是合适的。
Maven依赖 | 支持自 | 消费者和供应者类名称 | Kafka版 | 笔记 |
---|---|---|---|---|
flink-connector-Kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.4 | 在内部使用Kafka 的SimpleConsumer API。Flink将抵消ZK的抵消。 |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用新的Consumer API Kafka。 |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 此连接器支持带有时间戳的Kafka消息,用于生成和使用。 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x | 由于0.11.x Kafka不支持scala 2.10。此连接器支持Kafka事务性消息传递,为生产者提供一次语义。 |
然后,导入maven项目中的连接器:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
请注意,流连接器当前不是二进制分发的一部分。在此处了解如何与群集执行链接。
安装Apache Kafka
- 按照Kafka快速入门的说明下载代码并启动服务器(每次启动应用程序前都需要启动Zookeeper和Kafka服务器)。
- 如果Kafka和zookeeper服务器在远程机器上运行,那么
advertised.host.name
将在设置config/server.properties
文件必须设置本机的IP地址。
Kafka 1.0.0+连接器
从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主要版本。相反,它在Flink发布时跟踪最新版本的Kafka。
如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。
兼容性
通过Kafka客户端API和代理的兼容性保证,通用Kafka连接器与较旧和较新的Kafka代理兼容。它与经纪人版本0.11.0或更高版本兼容,具体取决于所使用的功能。有关Kafka兼容性的详细信息,请参阅Kafka文档。
用法
要使用通用Kafka连接器,请为其添加依赖关系:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.1</version>
</dependency>
然后实例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。除了从模块和类名中删除特定的Kafka版本之外,API向后兼容Kafka 0.11连接器。
Kafka消费者
Flink的Kafka消费者被称为FlinkKafkaConsumer08
(或09
Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。
构造函数接受以下参数:
- 主题名称/主题名称列表
- DeserializationSchema / KeyedDeserializationSchema用于反序列化来自Kafka的数据
- Kafka消费者的属性。需要以下属性:
- “bootstrap.servers”(以逗号分隔的Kafka经纪人名单)
- “zookeeper.connect”(逗号分隔的Zookeeper服务器列表)(仅Kafka 0.8需要)
- “group.id”消费者群组的ID
例:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print()
DeserializationSchema
Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。在 DeserializationSchema
允许用户指定这样的一个架构。T deserialize(byte[] message)
为每个Kafka消息调用该方法,从Kafka传递值。
从它开始通常很有帮助AbstractDeserializationSchema
,它负责将生成的Java / Scala类型描述为Flink的类型系统。实现vanilla的用户DeserializationSchema
需要自己实现该getProducedType(...)
方法。
为了访问Kafka消息的键和值,KeyedDeserializationSchema
具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)
。
为方便起见,Flink提供以下模式:
-
TypeInformationSerializationSchema
(和TypeInformationKeyValueSerializationSchema
)创建基于Flink的模式TypeInformation
。如果Flink编写和读取数据,这将非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。 -
JsonDeserializationSchema
(和JSONKeyValueDeserializationSchema
)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。KeyValue objectNode包含一个“key”和“value”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于公开此消息的偏移量/分区/主题。 -
AvroDeserializationSchema
它使用静态提供的模式读取使用Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(...)
)中推断出模式,也可以GenericRecords
使用手动提供的模式(withAvroDeserializationSchema.forGeneric(...)
)。此反序列化架构要求序列化记录不包含嵌入式架构。- 还有一个可用的模式版本,可以在Confluent Schema Registry中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过
ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
或ConfluentRegistryAvroDeserializationSchema.forSpecific(...)
)。
要使用此反序列化模式,必须添加以下附加依赖项:
- 还有一个可用的模式版本,可以在Confluent Schema Registry中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过
- ConfluentRegistryAvroDeserializationSchema
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
当遇到因任何原因无法反序列化的损坏消息时,有两个选项 - 从deserialize(...)
方法中抛出异常将导致作业失败并重新启动,或者返回null
以允许Flink Kafka使用者以静默方式跳过损坏的消息。请注意,由于使用者的容错能力(请参阅下面的部分以获取更多详细信息),因此对损坏的消息执行失败将使消费者尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。
Kafka消费者开始位置配置
Flink Kafka Consumer允许配置如何确定Kafka分区的起始位置。
例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible myConsumer.setStartFromLatest() // start from the latest record myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
Flink Kafka Consumer的所有版本都具有上述明确的起始位置配置方法。
setStartFromGroupOffsets
(默认行为):从group.id
Kafka代理(或Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。如果找不到分区的偏移量,auto.offset.reset
将使用属性中的设置。setStartFromEarliest()
/setStartFromLatest()
:从最早/最新记录开始。在这些模式下,Kafka中的承诺偏移将被忽略,不会用作起始位置。setStartFromTimestamp(long)
:从指定的时间戳开始。对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。
您还可以指定消费者应从每个分区开始的确切偏移量:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上面的示例将使用者配置为从主题的分区0,1和2的指定偏移量开始myTopic
。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()
该特定分区的默认组偏移行为(即)。
请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定(有关检查点的信息,请参阅下一节以启用消费者的容错)。
Kafka消费者和容错
启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他 算子操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。
因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。
要使用容错的Kafka使用者,需要在运行环境中启用拓扑的检查点:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
另请注意,如果有足够的处理插槽可用于重新启动拓扑,则Flink只能重新启动拓扑。因此,如果拓扑由于丢失了TaskManager而失败,那么之后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。
如果未启用检查点,Kafka使用者将定期向Zookeeper提交偏移量。
Kafka消费者主题和分区发现
分区发现
Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次性保证来消耗它们。在初始检索分区元数据之后(即,当作业开始运行时)发现的所有分区将从最早可能的偏移量中消耗。
默认情况下,禁用分区发现。要启用它,请flink.partition-discovery.interval-millis
在提供的属性配置中设置非负值,表示以毫秒为单位的发现间隔。
限制当从Flink 1.3.x之前的Flink版本的保存点还原使用者时,无法在还原运行时启用分区发现。如果启用,则还原将失败并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 1.3.x中使用保存点,然后再从中恢复。
主题发现
在更高级别,Flink Kafka Consumer还能够使用正则表达式基于主题名称的模式匹配来发现主题。请参阅以下示例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
...
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
...
在上面的示例中,test-topic-
当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有主题(以单个数字开头并以单个数字结尾)。
要允许使用者在作业开始运行后发现动态创建的主题,请为其设置非负值flink.partition-discovery.interval-millis
。这允许使用者发现名称也与指定模式匹配的新主题的分区。
Kafka消费者抵消承诺行为配置
Flink Kafka Consumer允许配置如何将偏移提交回Kafka代理(或0.8中的Zookeeper)的行为。请注意,Flink Kafka Consumer不依赖于承诺的偏移量来实现容错保证。承诺的抵消仅是用于暴露消费者进展以进行监控的手段。
配置偏移提交行为的方式不同,具体取决于是否为作业启用了检查点。
-
_禁用_检查_点:_如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交函数。因此,要禁用或启用偏移提交,只需将
enable.auto.commit
(或auto.commit.enable
Kafka 0.8)/auto.commit.interval.ms
键设置为所提供Properties
配置中的适当值。 -
_启用_检查_点:_如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。这可确保Kafka代理中的承诺偏移量与检查点状态中的偏移量一致。用户可以通过调用使用者的方法来选择禁用或启用偏移提交
setCommitOffsetsOnCheckpoints(boolean)
(默认情况下,行为是true
)。请注意,在此方案中,Properties
完全忽略自动定期偏移提交设置。
Kafka消费者和时间戳提取/水印排放
在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印,例如基于包含当前事件时间水印的Kafka流中的特殊记录。对于这些情况,Flink Kafka Consumer允许指定一个AssignerWithPeriodicWatermarks
或一个AssignerWithPunctuatedWatermarks
。
您可以按此处所述指定自定义时间戳提取器/水印发射器,或使用 预定义的时间戳提取器/水印发射器 。完成后,您可以通过以下方式将其传递给您的消费者:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
.addSource(myConsumer)
.print()
在内部,每个Kafka分区执行一个分配器实例。当指定这样的分配器时,对于从Kafka读取的每个记录,extractTimestamp(T element, long previousElementTimestamp)
调用为记录分配时间戳, 并调用Watermark getCurrentWatermark()
(用于定期)或 Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)
(用于标点符号)以确定是否应该发出新的水印并且时间戳。
注意:如果水印分配器依赖于从Kafka读取的记录来推进其水印(通常是这种情况),则所有主题和分区都需要具有连续的记录流。否则,整个应用程序的水印无法前进,并且所有基于时间的 算子操作(例如时间窗口或具有计时器的函数)都无法取得进展。单个空闲Kafka分区会导致此行为。计划进行Flink改进以防止这种情况发生(参见FLINK-5479:FlinkKafkaConsumer中的每分区水印应考虑空闲分区)。同时,可能的解决方法是将_心跳消息发送_到所有消耗的分区,从而推进空闲分区的水印。
Kafka提供者
Flink的Kafka Producer被称为FlinkKafkaProducer011
(或010
Kafka 0.10.0.x等)。它允许将记录流写入一个或多个Kafka主题。
例:
DataStream<String> stream = ...;
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true)
stream.addSink(myProducer)
上面的示例演示了创建Flink Kafka Producer以将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其他构造函数变体允许提供以下内容:
- 提供自定义属性:生产者允许为内部提供自定义属性配置
KafkaProducer
。有关如何配置Kafka Producers的详细信息,请参阅Apache Kafka文档。 - 自定义分区程序:要将记录分配给特定分区,可以为
FlinkKafkaPartitioner
构造函数提供a的实现。将为流中的每个记录调用此分区程序,以确定应将记录发送到的目标主题的确切分区。有关详细信息,请参阅Kafka Producer Partitioning Scheme。 - 高级序列化模式:与使用者类似,生产者还允许使用调用的高级序列化模式
KeyedSerializationSchema
,该模式允许单独序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。
Kafka提供者分区方案
默认情况下,如果未为Flink Kafka Producer指定自定义分区程序,则生产者将使用FlinkFixedPartitioner
将每个Flink Kafka Producer并行子任务映射到单个Kafka分区(即,接收器子任务接收的所有记录将最终都在同一个Kafka分区)。
可以通过扩展FlinkKafkaPartitioner
类来实现自定义分区程序。所有Kafka版本的构造函数都允许在实例化生成器时提供自定义分区程序。请注意,分区器实现必须是可序列化的,因为它们将通过Flink节点传输。此外,请记住,分区程序中的任何状态都将在作业失败时丢失,因为分区程序不是生产者的检查点状态的一部分。
也可以完全避免使用和使用分区器,并简单地让Kafka通过其附加Keys对记录的记录进行分区(使用提供的序列化模式为每条记录确定)。为此,请null
在实例化生产者时提供自定义分区程序。提供null
自定义分区器很重要; 如上所述,如果未指定自定义分区程序,FlinkFixedPartitioner
则使用该分区程序。
Kafka生产者和容错
Kafka0.8
在0.9之前,Kafka没有提供任何机制来保证至少一次或完全一次的语义。
Kafka0.9和0.10
启用Flink的检查点时,FlinkKafkaProducer09
并FlinkKafkaProducer010
能提供在-至少一次传输保证。
除了使Flink的检查点,你还应该配置setter方法setLogFailuresOnly(boolean)
和setFlushOnCheckpoint(boolean)
适当的。
setLogFailuresOnly(boolean)
:默认情况下,此设置为false
。启用此选项将使生产者仅记录失败而不是捕获和重新抛出它们。这基本上使记录成功,即使它从未写入目标Kafka主题。这必须至少禁用一次。setFlushOnCheckpoint(boolean)
:默认情况下,此设置为true
。启用此函数后,Flink的检查点将在检查点成功之前等待检查点时的任何动态记录被Kafka确认。这可确保检查点之前的所有记录都已写入Kafka。必须至少启用一次。
总之,默认情况下,Kafka生成器对版本0.9和0.10具有至少一次保证,setLogFailureOnly
设置为false
和setFlushOnCheckpoint
设置为true
。
注意:默认情况下,重试次数设置为“0”。这意味着当setLogFailuresOnly
设置为时false
,生产者会立即失败,包括Leader更改。默认情况下,该值设置为“0”,以避免重试导致目标主题中出现重复消息。对于经常更改代理的大多数生产环境,我们建议将重试次数设置为更高的值。
注意:Kafka目前没有交易生产者,因此Flink无法保证一次性交付Kafka主题。
Kafka0.11及以上
启用Flink的检查点后,FlinkKafkaProducer011
(FlinkKafkaProducer 对于Kafka> = 1.0.0版本)可以提供准确的一次交付保证。
除了启用Flink的检查点,您还可以通过将适当的semantic
参数传递给FlinkKafkaProducer011
(FlinkKafkaProducer
对于Kafka> = 1.0.0版本):
Semantic.NONE
:Flink不保证任何东西。生成的记录可能会丢失,也可能会被复制。Semantic.AT_LEAST_ONCE
(默认设置):类似于setFlushOnCheckpoint(true)
在FlinkKafkaProducer010
。这可以保证不会丢失任何记录(尽管它们可以重复)。Semantic.EXACTLY_ONCE
:使用Kafka事务提供完全一次的语义。每当您使用事务写入Kafka时,不要忘记为消耗Kafka记录的任何应用程序设置所需的isolation.level
(read_committed
或read_uncommitted
- 后者是默认值)。
注意事项
Semantic.EXACTLY_ONCE
模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置您的交易超时。
Kafka经纪人默认transaction.max.timeout.ms
设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。 FlinkKafkaProducer011
默认情况下,将transaction.timeout.ms
producer config中的属性设置为1小时,因此transaction.max.timeout.ms
在使用Semantic.EXACTLY_ONCE
模式之前应该增加 该属性。
在read_committed
模式中KafkaConsumer
,任何未完成的事务(既不中止也不完成)将阻止来自给定Kafka主题的所有读取超过任何未完成的事务。换言之,遵循以下事件顺序:
- 用户
transaction1
使用它来启动和编写一些记录 - 用户
transaction2
使用它开始并编写了一些其他记录 - 用户承诺
transaction2
即使transaction2
已经提交了记录,在transaction1
提交或中止之前,消费者也不会看到它们。这有两个含义:
- 首先,在Flink应用程序的正常工作期间,用户可以预期Kafka主题中生成的记录的可见性会延迟,等于已完成检查点之间的平均时间。
- 其次,在Flink应用程序失败的情况下,读者将阻止此应用程序编写的主题,直到应用程序重新启动或配置的事务超时时间过去为止。此注释仅适用于有多个代理/应用程序写入同一Kafka主题的情况。
注意: Semantic.EXACTLY_ONCE
mode为每个FlinkKafkaProducer011
实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011
将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
注意:Semantic.EXACTLY_ONCE
采取所有可能的措施,不要留下任何阻碍消费者阅读Kafka主题的延迟交易,这是必要的。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序是不安全的FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR
。
在Kafka 0.10中使用Kafka时间戳和Flink事件时间
自Apache Kafka 0.10+以来,Kafka的消息可以携带时间戳,指示事件发生的时间(请参阅Apache Flink中的“事件时间”)或消息写入Kafka代理的时间。
在FlinkKafkaConsumer010
将发射记录附有时间戳,如果在Flink时间特性被设定为TimeCharacteristic.EventTime
(StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
)。
Kafka消费者不会发出水印。为了发射水印,使用与上述“Kafka消费者和时间戳提取/水印发射”中描述的相同的机制assignTimestampsAndWatermarks
是适用的。
使用Kafka的时间戳时,无需定义时间戳提取器。该方法的previousElementTimestamp
参数extractTimestamp()
包含Kafka消息携带的时间戳。
Kafka消费者的时间戳提取器如下所示:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
该FlinkKafkaProducer010
只发出了创纪录的时间戳,如果setWriteTimestampToKafka(true)
设置。
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
Kafka Connector指标
Flink的Kafka连接器通过Flink的度量系统提供一些指标来分析连接器的行为。生产者通过Flink的度量系统为所有支持的版本导出Kafka的内部指标。消费者从Kafka 0.9版开始导出所有指标。Kafka文档在其文档中列出了所有导出的度量标准。
除了这些指标,所有消费者揭露current-offsets
,并committed-offsets
为每个主题分区。的current-offsets
是指当前分区中的偏移量。这指的是我们成功检索和发出的最后一个数据元的偏移量。这committed-offsets
是最后提交的偏移量。
Flink的Kafka消费者将抵消权交还给Zookeeper(Kafka 0.8)或Kafka经纪人(Kafka 0.9+)。如果禁用了检查点,则会定期提交偏移量。通过检查点,一旦流拓扑中的所有 算子确认已创建其状态的检查点,就会发生提交。这为用户提供了至少一次语义,用于提交给Zookeeper或代理的偏移量。对于Flink的偏移检查点,系统提供一次保证。
提交给ZK或经纪人的抵消也可用于跟踪Kafka消费者的阅读进度。提交的偏移量与每个分区中最近的偏移量之间的差异称为_消费者滞后_。如果Flink拓扑消耗主题的数据比添加新数据的速度慢,则滞后将增加,消费者将落后。对于大型生产部署,我们建议监控该指标以避免增加延迟。
启用Kerberos身份验证(仅适用于0.9及更高版本)
Flink通过Kafka连接器提供一流的支持,以对为Kerberos配置的Kafka安装进行身份验证。只需配置Flink flink-conf.yaml
即可为Kafka启用Kerberos身份验证,如下所示:
- 通过设置以下内容配置Kerberos凭据 -
security.kerberos.login.use-ticket-cache
:默认情况下,这是true
和Flink将尝试在由托管的票证缓存中使用Kerberos凭据kinit
。请注意,在YARN上部署的Flink作业中使用Kafka连接器时,使用票证缓存的Kerberos授权将不起作用。使用Mesos进行部署时也是如此,因为Mesos部署不支持使用票证缓存进行授权。security.kerberos.login.keytab
和security.kerberos.login.principal
:要使用Kerberos keytabs,请为这两个属性设置值。
- 附加
KafkaClient
到security.kerberos.login.contexts
:这告诉Flink将配置的Kerberos凭据提供给Kafka登录上下文以用于Kafka身份验证。
启用基于Kerberos的Flink安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部Kafka客户端),即可使用Flink Kafka Consumer或Producer向Kafka进行身份验证:
- 设置
security.protocol
为SASL_PLAINTEXT
(默认NONE
):用于与Kafka代理进行通信的协议。使用独立的Flink部署时,您也可以使用SASL_SSL
; 请在此处查看如何为SSL配置Kafka客户端。 - 设置
sasl.kerberos.service.name
为kafka
(默认值kafka
):此值应与sasl.kerberos.service.name
用于Kafka代理配置的值相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
有关Kerberos安全性的Flink配置的更多信息,请参阅此处。您还可以在此处找到有关Flink内部如何设置基于Kerberos的安全性的更多详细信息。
故障排除
如果您在使用Flink时遇到Kafka问题,请记住Flink只包装KafkaConsumer或KafkaProducer,您的问题可能独立于Flink,有时可以通过升级Kafka代理,重新配置Kafka代理或重新配置KafkaConsumer
或KafkaProducer
在Flink中解决。下面列出了一些常见问题的例子。
数据丢失
注意:根据您的Kafka配置,即使在Kafka确认写入后您仍然可能会遇到数据丢失。特别要记住以下Kafka设置:
ack
log.flush.interval.messages
log.flush.interval.ms
log.flush.*
上述选项的默认值很容易导致数据丢失。有关更多说明,请参阅Kafka文档。
未知主题或分区异常
导致此错误的一个可能原因是新的领导者选举正在进行,例如在重新启动Kafka经纪人之后或期间。这是一个可重试的异常,因此Flink作业应该能够重启并恢复正常运行。它也可以通过更改retries
生产者设置中的属性来规避。但是,这可能会导致重新排序消息,这反过来可以通过设置max.in.flight.requests.per.connection
为1 来避免不需要的消息。