Hello,

When trying to reproduce a bug, we made a DeserialisationSchema that throws an exception when a malformed message comes in. Then, we sent a malformed message together with a number of well formed messages to see what happens.

valsource= KafkaSource.builder[OurMessage]()
.setValueOnlyDeserializer(newBuggySchema(tolerateInvalidIncomingJSON))
.setBootstrapServers(bootstrap_servers)
.setTopics("mqtt")
.setGroupId("flink")
.setProperty("isolation.level","read_committed")
.setStartingOffsets(OffsetsInitializer.latest())
.setProperty("transaction.timeout.ms", "900000")
.setProperty("partition.discovery.interval.ms", "60000")
.build;

to simulate our slow API we did this:

valpusher= alarms.map(x => {Thread.sleep(8000); x.toString()})
pusher.sinkTo(buildAtLeastOnceStringSink(bootstrap_servers, p, "alm_log")).uid("almlog").name("almlog")

Then, we injected a lot of messages, and also one invalid message. We repeated this test multiple times. And this is where things got weird: i would expect the job to fail, restart and fail again (since, upon restarting, it should reprocess the same invalid message). Sometimes this indeed happens, but sometimes we get the exception only once and then the application remains in "running" state without continuous crashing and recovery. I think this is somehow related to the earlier issue we saw with the duplicated messages.

However, that's not what happens always: sometimes the job fails, restarts and then keeps running. And sometimes, it goes into my (expected) restart loop. Until i understand what's going on, i disabled flink task failure recovery, and i rely on the flink k8s operator to restart the job on failure. But i'd like to understand what's happening.

As a side note: in our pipeline we switch a couple of times between the table API and the datastream API. could that influence the failure zone determination?

Thanks!

Greetings,
Frank

Reply via email to