Flink生产数据到Kafka频繁出现事务失效导致任务重启

2023-03-12,,

在生产中需要将一些数据发到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本为1.1.0,flink的版本为1.8.0,但是会很经常因为提交事务引起错误,甚至导致任务重启

kafka producer的配置如下

  def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = {
val properties = new Properties()
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr)
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 6000 * 6 + "")
// 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1048576 * 5))
val serial = new KeyedSerializationSchemaWrapper(new SimpleStringSchema())
//val producer = new FlinkKafkaProducer011[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
val producer = new FlinkKafkaProducer[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
producer.setWriteTimestampToKafka(true)
producer
}

Flink env如下

    val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60 * 1000 * 1, CheckpointingMode.EXACTLY_ONCE)
val config = env.getCheckpointConfig
//RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
config.setMinPauseBetweenCheckpoints(3000)
//用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
config.setMaxConcurrentCheckpoints(1)
//指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
config.setCheckpointTimeout(30000)
//用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
//https://issues.apache.org/jira/browse/FLINK-11662
config.setFailOnCheckpointingErrors(false)

然后经常会出现事务失效的问题,报错有很多种,大概为以下

java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
... 5 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278)
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:105)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:650)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Checkpoint failed: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Checkpoint failed: Could not complete snapshot 11 for operator Sink: data_Sink (2/2).

这些错误基本涉及到两阶段提交、事务、checkpoint。

查看kafka documentation和研究ProducerConfig这个类后发现 kafka producer 在使用EXACTLY_ONCE的时候需要增加一些配置

the transaction timeout must be larger than the checkpoint interval, but smaller than the broker transaction.max.timeout.ms.

在getKafkaProducer增加以下配置后,出现原来的错误减少

    //checkpoint 间隔时间<TRANSACTION_TIMEOUT_CONFIG<kafka transaction.max.timeout.ms (默认900秒)
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 3 + "")
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

问题得到缓解。

参考:

https://www.cnblogs.com/wangzhuxing/p/10111831.html

http://www.heartthinkdo.com/?p=2040

http://romanmarkunas.com/web/blog/kafka-transactions-in-practice-1-producer/

Flink生产数据到Kafka频繁出现事务失效导致任务重启的相关教程结束。

《Flink生产数据到Kafka频繁出现事务失效导致任务重启.doc》

下载本文的Word格式文档,以方便收藏与打印。