To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead
of EXACTLY_ONCE makes the problem go away so I imagine there is something
wrong with my setup.
I'm using Kafka 2.2 and I have the following things set on the cluster:

transaction.max.timeout.ms=3600000
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


On Mon, 16 Nov 2020 at 14:05, Tim Josefsson <tim.josefs...@webstep.se>
wrote:

> Hello!
>
> I'm having some problems with my KafkaProducer that I've been unable to
> find a solution to.
>
> I've set up a simple Flink Job that reads from one kafka topic, using
>    *kafkaProps.setProperty("isolation.level", "read_committed") *
> since I want to support exactly once data in my application.
>
> After doing some enriching of the data I read from kafka I have the
> following producer set up
>
> FlinkKafkaProducer<PlayerSessionEnriched> kafkaSinkProducer = new
> FlinkKafkaProducer<>(
>                 "enrichedPlayerSessionsTest",
>                 new
> KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
>                 producerProps,
>                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE
>         );
>
> The producer above is then added as a sink at the end of my Flink job.
>
> Now when I run this application I get the following message,
>
>     13:44:40,758 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-6, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 
> 4
>     13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer        
>        - [Producer clientId=producer-6, transactionalId=Source: playerSession 
> and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
>
> Sometime I also see the following:
>
>     13:44:43,740 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-26, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
>     13:44:44,136 INFO  
> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
> clientId=producer-26, transactionalId=Source: playerSession and 
> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 
> 11
> 13:44:44,147 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 has no restore state.
>
> Now since this isn't an error the job doesn't crash while running and data 
> does get written to Kafka even with this message. However it does seem wrong 
> to me and I'm wondering if anyone has any insight into why this is happening?
>
> I'm attaching a GIST with the complete log from the application, I ran the 
> job with *env.setParallelism(1)* but I still get around 26 producers created 
> which still seems odd to me. Running without any parallelism set creates 
> about 300-400 producers (based of the clientIds reported)
>
> Thankful for any insight into this!
>
> Best regards,
>
> Tim
>
>

-- 

*Tim Josefsson*
[image: Webstep GPtW] <http://www.webstep.se/>
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

tim.josefs...@webstep.se
*webstep.se <http://www.webstep.se/>*
Suttungs gränd 2
753 19 Uppsala
Stockholm | Uppsala | Malmö | Sundsvall | Oslo
Bergen | Stavanger | Trondheim | Kristiansand
[image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
<http://www.instagram.com/webstep_sverige>

Reply via email to