Vedran Ljubovic created IGNITE-19459:
----------------------------------------

             Summary: Kafka Connect IgniteSinkConnector drops messages in case 
of error
                 Key: IGNITE-19459
                 URL: https://issues.apache.org/jira/browse/IGNITE-19459
             Project: Ignite
          Issue Type: Bug
          Components: extensions
    Affects Versions: 2.15
            Reporter: Vedran Ljubovic


We are using Kafka Connect (KC) to stream messages from Kafka to Ignite. Since 
the Kafka topic is using null key, we have created a custom 
SingleTupleExtractor to generate keys from payload. This works very well when 
everything is ok. However, if there are any kind of issues with starting a 
cache on Ignite (such as if cluster state is inactive or if cache has 
lostParts), we expect KC to fail to start. Instead, KC will start and appear to 
be running, and the messages will be dropped - which means that once the 
problems are removed, KC will not attempt to resend the messages even after 
restart! This for us is unacceptable, we believe that the system should be 
reliable and fault-tolerant.

In logs we notice errors such as:
{code:java}
Failed to stream a record with null key! {code}
which is useless since we do have a SingleTupleExtractor for this purpose and 
we can see that it isn't being called at all!

When KC REST API [1] is used, we find the state is RUNNING which means that we 
have no way to detect this error other than parsing the logs which is 
unreliable.

Upon investigating this issue, we found the following:
 * Ignite connection and IgniteDataStreamer are declared as private static 
final fields of an inner class, they will be initialized when calling the 
start() method of IgniteSinkConnector. From KC docs [2], we conclude that 
method initialize() should be overloaded and the connections created there, 
also that appropriate exception types should be thrown so that KC knows that 
connection has failed and terminate the task/connector.
 * When start() method is called, StreamerContext.getStreamer() in line 72 will 
fail with exception. This exception is not handled by KC so it doesn't know 
that task failed to start. In addition, code will never reach line 91 where 
SingleTupleExtractor is created therefore there will be no extractor. Solution 
would be to catch all types of exceptions and throw those exceptions which will 
be detected by KC as critical errors. Alternatively, put() method should throw 
an exception is stopped is true.
 * When put() method is called, if there is no key and no extractor, in line 
121 we see that the error is logged but exception is not thrown so KC thinks 
that the message was successfully streamed. Here, ConnectException should be 
thrown. If users want the current behavior (which is to stream Kafka messages 
with key and skip those without key), they can set option errors.tolerance = 
all in connector config. [3]

[1] [https://docs.confluent.io/platform/current/connect/references/restapi.html]

[2] 
[https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html]

[3] 
[https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to