Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api


| |
selves_nan
|
|
[email protected]
|
签名由网易邮箱大师定制


在2022年01月21日 13:00,潘明文<[email protected]> 写道:
HI,
"生产者的事务id"  怎么获取呀?

















在 2022-01-21 10:41:37,"selves_nan" <[email protected]> 写道:
Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
//开启幂等性
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


| |
selves_nan
|
|
[email protected]
|
签名由网易邮箱大师定制


在2022年01月20日 14:39,潘明文<[email protected]> 写道:
hi,
我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。

FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException

回复