[ https://issues.apache.org/jira/browse/IGNITE-19459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vedran Ljubovic updated IGNITE-19459: ------------------------------------- Attachment: event_dropping.patch > Kafka Connect IgniteSinkConnector drops messages in case of error > ----------------------------------------------------------------- > > Key: IGNITE-19459 > URL: https://issues.apache.org/jira/browse/IGNITE-19459 > Project: Ignite > Issue Type: Bug > Components: extensions > Affects Versions: 2.15 > Reporter: Vedran Ljubovic > Priority: Major > Attachments: event_dropping.patch > > > We are using Kafka Connect (KC) to stream messages from Kafka to Ignite. > Since the Kafka topic is using null key, we have created a custom > SingleTupleExtractor to generate keys from payload. This works very well when > everything is ok. However, if there are any kind of issues with starting a > cache on Ignite (such as if cluster state is inactive or if cache has > lostParts), we expect KC to fail to start. Instead, KC will start and appear > to be running, and the messages will be dropped - which means that once the > problems are removed, KC will not attempt to resend the messages even after > restart! This for us is unacceptable, we believe that the system should be > reliable and fault-tolerant. > In logs we notice errors such as: > {code:java} > Failed to stream a record with null key! {code} > which is useless since we do have a SingleTupleExtractor for this purpose and > we can see that it isn't being called at all! > When KC REST API [1] is used, we find the state is RUNNING which means that > we have no way to detect this error other than parsing the logs which is > unreliable. > Upon investigating this issue, we found the following: > * Ignite connection and IgniteDataStreamer are declared as private static > final fields of an inner class, they will be initialized when calling the > start() method of IgniteSinkConnector. From KC docs [2], we conclude that > method initialize() should be overloaded and the connections created there, > also that appropriate exception types should be thrown so that KC knows that > connection has failed and terminate the task/connector. > * When start() method is called, StreamerContext.getStreamer() in line 72 > will fail with exception. This exception is not handled by KC so it doesn't > know that task failed to start. In addition, code will never reach line 91 > where SingleTupleExtractor is created therefore there will be no extractor. > Solution would be to catch all types of exceptions and throw those exceptions > which will be detected by KC as critical errors. Alternatively, put() method > should throw an exception is stopped is true. > * When put() method is called, if there is no key and no extractor, in line > 121 we see that the error is logged but exception is not thrown so KC thinks > that the message was successfully streamed. Here, ConnectException should be > thrown. If users want the current behavior (which is to stream Kafka messages > with key and skip those without key), they can set option errors.tolerance = > all in connector config. [3] > [1] > [https://docs.confluent.io/platform/current/connect/references/restapi.html] > [2] > [https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html] > [3] > [https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)