Vedran Ljubovic created IGNITE-19459:
----------------------------------------
Summary: Kafka Connect IgniteSinkConnector drops messages in case
of error
Key: IGNITE-19459
URL: https://issues.apache.org/jira/browse/IGNITE-19459
Project: Ignite
Issue Type: Bug
Components: extensions
Affects Versions: 2.15
Reporter: Vedran Ljubovic
We are using Kafka Connect (KC) to stream messages from Kafka to Ignite. Since
the Kafka topic is using null key, we have created a custom
SingleTupleExtractor to generate keys from payload. This works very well when
everything is ok. However, if there are any kind of issues with starting a
cache on Ignite (such as if cluster state is inactive or if cache has
lostParts), we expect KC to fail to start. Instead, KC will start and appear to
be running, and the messages will be dropped - which means that once the
problems are removed, KC will not attempt to resend the messages even after
restart! This for us is unacceptable, we believe that the system should be
reliable and fault-tolerant.
In logs we notice errors such as:
{code:java}
Failed to stream a record with null key! {code}
which is useless since we do have a SingleTupleExtractor for this purpose and
we can see that it isn't being called at all!
When KC REST API [1] is used, we find the state is RUNNING which means that we
have no way to detect this error other than parsing the logs which is
unreliable.
Upon investigating this issue, we found the following:
* Ignite connection and IgniteDataStreamer are declared as private static
final fields of an inner class, they will be initialized when calling the
start() method of IgniteSinkConnector. From KC docs [2], we conclude that
method initialize() should be overloaded and the connections created there,
also that appropriate exception types should be thrown so that KC knows that
connection has failed and terminate the task/connector.
* When start() method is called, StreamerContext.getStreamer() in line 72 will
fail with exception. This exception is not handled by KC so it doesn't know
that task failed to start. In addition, code will never reach line 91 where
SingleTupleExtractor is created therefore there will be no extractor. Solution
would be to catch all types of exceptions and throw those exceptions which will
be detected by KC as critical errors. Alternatively, put() method should throw
an exception is stopped is true.
* When put() method is called, if there is no key and no extractor, in line
121 we see that the error is logged but exception is not thrown so KC thinks
that the message was successfully streamed. Here, ConnectException should be
thrown. If users want the current behavior (which is to stream Kafka messages
with key and skip those without key), they can set option errors.tolerance =
all in connector config. [3]
[1] [https://docs.confluent.io/platform/current/connect/references/restapi.html]
[2]
[https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html]
[3]
[https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)