To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead of EXACTLY_ONCE makes the problem go away so I imagine there is something wrong with my setup. I'm using Kafka 2.2 and I have the following things set on the cluster:
transaction.max.timeout.ms=3600000 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 On Mon, 16 Nov 2020 at 14:05, Tim Josefsson <tim.josefs...@webstep.se> wrote: > Hello! > > I'm having some problems with my KafkaProducer that I've been unable to > find a solution to. > > I've set up a simple Flink Job that reads from one kafka topic, using > *kafkaProps.setProperty("isolation.level", "read_committed") * > since I want to support exactly once data in my application. > > After doing some enriching of the data I read from kafka I have the > following producer set up > > FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new > FlinkKafkaProducer<>( > "enrichedPlayerSessionsTest", > new > KafkaStringSerializationSchema("enrichedPlayerSessionsTest"), > producerProps, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE > ); > > The producer above is then added as a sink at the end of my Flink job. > > Now when I run this application I get the following message, > > 13:44:40,758 INFO > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=producer-6, transactionalId=Source: playerSession and > playserSessionStarted from Kafka -> Filter out playerSessionStarted -> > Extract PlayerSession -> Set MDC for event -> Map -> (Wrap > playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to > Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch > 4 > 13:44:40,759 INFO org.apache.kafka.clients.producer.KafkaProducer > - [Producer clientId=producer-6, transactionalId=Source: playerSession > and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> > Extract PlayerSession -> Set MDC for event -> Map -> (Wrap > playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to > Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > > Sometime I also see the following: > > 13:44:43,740 INFO > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=producer-26, transactionalId=Source: playerSession and > playserSessionStarted from Kafka -> Filter out playerSessionStarted -> > Extract PlayerSession -> Set MDC for event -> Map -> (Wrap > playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to > Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1 > 13:44:44,136 INFO > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=producer-26, transactionalId=Source: playerSession and > playserSessionStarted from Kafka -> Filter out playerSessionStarted -> > Extract PlayerSession -> Set MDC for event -> Map -> (Wrap > playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to > Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch > 11 > 13:44:44,147 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 has no restore state. > > Now since this isn't an error the job doesn't crash while running and data > does get written to Kafka even with this message. However it does seem wrong > to me and I'm wondering if anyone has any insight into why this is happening? > > I'm attaching a GIST with the complete log from the application, I ran the > job with *env.setParallelism(1)* but I still get around 26 producers created > which still seems odd to me. Running without any parallelism set creates > about 300-400 producers (based of the clientIds reported) > > Thankful for any insight into this! > > Best regards, > > Tim > > -- *Tim Josefsson* [image: Webstep GPtW] <http://www.webstep.se/> mobil +46 (0) 707 81 91 12 telefon +46 (0) 8 21 40 70 tim.josefs...@webstep.se *webstep.se <http://www.webstep.se/>* Suttungs gränd 2 753 19 Uppsala Stockholm | Uppsala | Malmö | Sundsvall | Oslo Bergen | Stavanger | Trondheim | Kristiansand [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image: Facebook] <http://www.facebook.com/webstepAB> [image: Facebook] <http://www.instagram.com/webstep_sverige>