Hello,
When trying to reproduce a bug, we made a DeserialisationSchema that
throws an exception when a malformed message comes in.
Then, we sent a malformed message together with a number of well formed
messages to see what happens.
valsource= KafkaSource.builder[OurMessage]()
.setValueOnlyDeserializer(newBuggySchema(tolerateInvalidIncomingJSON))
.setBootstrapServers(bootstrap_servers)
.setTopics("mqtt")
.setGroupId("flink")
.setProperty("isolation.level","read_committed")
.setStartingOffsets(OffsetsInitializer.latest())
.setProperty("transaction.timeout.ms", "900000")
.setProperty("partition.discovery.interval.ms", "60000")
.build;
to simulate our slow API we did this:
valpusher= alarms.map(x => {Thread.sleep(8000); x.toString()})
pusher.sinkTo(buildAtLeastOnceStringSink(bootstrap_servers, p,
"alm_log")).uid("almlog").name("almlog")
Then, we injected a lot of messages, and also one invalid message. We
repeated this test multiple times. And this is where things got weird: i
would expect the job to fail, restart and fail again (since, upon
restarting, it should reprocess the same invalid message).
Sometimes this indeed happens, but sometimes we get the exception only
once and then the application remains in "running" state without
continuous crashing and recovery. I think this is somehow related to the
earlier issue we saw with the duplicated messages.
However, that's not what happens always: sometimes the job fails,
restarts and then keeps running. And sometimes, it goes into my
(expected) restart loop. Until i understand what's going on, i disabled
flink task failure recovery, and i rely on the flink k8s operator to
restart the job on failure. But i'd like to understand what's happening.
As a side note: in our pipeline we switch a couple of times between the
table API and the datastream API. could that influence the failure zone
determination?
Thanks!
Greetings,
Frank