Hmm, could you please post the full stack trace that leads to the
TimeoutException?
Best,
Aljoscha
On 10.11.20 17:54, Tim Josefsson wrote:
Hey Aljoscha,
I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer:
I create a Properties object and then set the property and finally add
those properties when creating the producer.
Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "900000");
If I don't set that property my I instead get the following config when
starting the job:
11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig
- ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 60000
transactional.id = Source: Read player events from Kafka -> Map
Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
So I imagine the Producer is picking up the change but it still returns
errors when running the job.
Best regards,
Tim
On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek <aljos...@apache.org> wrote:
On 10.11.20 11:53, Tim Josefsson wrote:
Also when checking my logs I see the following message:
11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig
- ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 900000
transactional.id = Source: Read player events from Kafka -> Map
Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
The interesting thing would be to figure out where that
`transaction.timeout.ms = 900000` is coming from. The default from Flink
would be 60000, if nothing is configured. Are you specifying that value,
maybe from the commandline or in code?
Maybe it's a funny coincidence, but our StreamingKafkaITCase also
specifies that timeout value.
Best,
Aljoscha