[ https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156456#comment-17156456 ]
mzz commented on FLINK-18575: ----------------------------- supply: the application was running,and message was normally produce ,but log message always report this error. > Failed to send data to Kafka > ---------------------------- > > Key: FLINK-18575 > URL: https://issues.apache.org/jira/browse/FLINK-18575 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: mzz > Priority: Major > > Flink version: 1.10.0 > Kafka version: 2.2 > *code:* > {code:java} > private def producerKafka(aggs_result: DataStream[String], topic: String, > parallelism: Int) = { > val kafkaPro = new Properties() > kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) > kafkaPro.setProperty("zookeeper.connect", SINK_ZK) > kafkaPro.setProperty("request.timeout.ms", "10000") > kafkaPro.setProperty("compression.type", "snappy") > kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") > // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: > kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") > val kafka = new FlinkKafkaProducer[String](topic, new > ResultDtSerialization(topic), kafkaPro, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) > aggs_result.addSink(kafka).setParallelism(parallelism) > } > {code} > *when i use this code to produce to kafka ,its report a Error : > *{code:java} > 2020-07-13 10:25:47,624 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Pending record count must be zero at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Pending record count must be zero > at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) > ... 8 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)