Hi Till,
Having been unaware of this mail thread I've created a Jira Bug
https://issues.apache.org/jira/browse/FLINK-23509 which proposes also a simple
solution.
Regards
Matthias
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitte
Hi,
Thanks Tianxin and 周瑞' for reporting and tracking down the problem. Indeed
that could be the reason behind it. Have either of you already created a
JIRA ticket for this bug?
> Concerning the required changing of the UID of an operator Piotr, is this
a known issue and documented somewhere? I f
Thanks for this insight. So the problem might be Flink using an internal
Kafka API (the connector uses reflection to get hold of the
TransactionManager) which changed between version 2.4.1 and 2.5. I think
this is a serious problem because it breaks our end-to-end exactly once
story when using new
I encountered the exact same issue before when experimenting in a testing
environment. I was not able to spot the bug as mentioned in this thread,
the solution I did was to downgrade my own kafka-client version from 2.5 to
2.4.1, matching the version of flink-connector-kafka.
In 2.4.1 Kafka, Transa
Thanks for the update. Skimming over the code it looks indeed that we are
overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
not 100% how this will cause the observed problem, though. I am also not a
Flink Kafka connector and Kafka expert so I would appreciate it if someone
mo
Forwarding 周瑞's message to a duplicate thread:
After our analysis, we found a bug in the
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
method
The analysis process is as follows:
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkF
Hi,
I think there is no generic way. If this error has happened indeed after
starting a second job from the same savepoint, or something like that, user
can change Sink's operator UID.
If this is an issue of intentional recovery from an earlier
checkpoint/savepoint, maybe `FlinkKafkaProducer#setL
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.
I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been re
HI:
When "sink.semantic = exactly-once", the following
exception is thrown when recovering from svaepoint
public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
"
"+COLUMN_NAME+" STRING\n" +
") WITH