上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下:
写入代码如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();
eventJsonStream.sinkTo(kafkaSink).setParallelism(14)
.name("event2kafka").uid("kafkasink");
kafka配置如下:
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers",
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*60000 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");
return kafkaProducerProps;
}
项目运行很久都没啥问题,最近突然报了以下的错误
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
xxxx-topic-2@-1 with
FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5',
inTransaction=true, closed=false}
at
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
at
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The
broker received an out of order sequence number.
参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq
但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。