Hmm, could you please post the full stack trace that leads to the TimeoutException?

Best,
Aljoscha

On 10.11.20 17:54, Tim Josefsson wrote:
Hey Aljoscha,

I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer:

I create a Properties object and then set the property and finally add
those properties when creating the producer.

Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "900000");

If I don't set that property my I instead get the following config when
starting the job:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
      - ProducerConfig values:
    acks = 1
    [omitted for brevity]
    transaction.timeout.ms = 60000
    transactional.id = Source: Read player events from Kafka -> Map
  Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
    value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

So I imagine the Producer is picking up the change but it still returns
errors when running the job.

Best regards,
Tim


On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek <aljos...@apache.org> wrote:

On 10.11.20 11:53, Tim Josefsson wrote:
Also when checking my logs I see the following message:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
       - ProducerConfig values:
     acks = 1
     [omitted for brevity]
     transaction.timeout.ms = 900000
     transactional.id = Source: Read player events from Kafka -> Map
   Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
     value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

The interesting thing would be to figure out where that
`transaction.timeout.ms = 900000` is coming from. The default from Flink
would be 60000, if nothing is configured. Are you specifying that value,
maybe from the commandline or in code?

Maybe it's a funny coincidence, but our StreamingKafkaITCase also
specifies that timeout value.

Best,
Aljoscha




Reply via email to