HI, "生产者的事务id" 怎么获取呀?
在 2022-01-21 10:41:37,"selves_nan" <[email protected]> 写道: >Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >//开启幂等性 >prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); > > >| | >selves_nan >| >| >[email protected] >| >签名由网易邮箱大师定制 > > >在2022年01月20日 14:39,潘明文<[email protected]> 写道: >hi, >我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 > >FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, >FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > > >org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The >producer attempted to use a producer id which is not currently assigned to its >transactional id. >at >org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >at >org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >at >org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >at >org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >at java.lang.Thread.run(Thread.java:748) >Suppressed: java.lang.NullPointerException >
