(possible dupe; I wasn't subscribed before and the previous message didn't seem to go through)
I'm on Flink v1.9 with the Kafka connector and a standalone JM. If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException which gets wrapped in a CheckpointException which is sent to the JM as a DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the JM throws a fairly cryptic ClassNotFoundException. The details of the KafkaException wind up suppressed so it's impossible to figure out what actually went wrong. I can think of two fixes that would prevent this from occurring in the Kafka or other connectors in the future: 1. DeclineCheckpoint should always send a SerializedThrowable to the JM rather than allowing CheckpointExceptions with non-deserializable root causes to slip through 2. CheckpointException should always capture its wrapped exception as a SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' rather than 'super(cause)'). Thoughts?