[ https://issues.apache.org/jira/browse/IGNITE-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071825#comment-15071825 ]
Denis Magda commented on IGNITE-2016: ------------------------------------- Roman, Agree, there is no need to setup auto-flushing on IgniteDataStreamer size since Kafka will call {{IgniteDataStreamer.flush()}} periodically. Moreover, IgniteDataStreamer's buffers will be flushed automatically when {{IgniteDataStreamer.perNodeBufferSize}} is reached meaning that the streamer doesn't need to wait until the flush is called and can push data to a cache asynchronously. Default value of {{IgniteDataStreamer.perNodeBufferSize}} is 1024 and I think it's ok to leave it as is not exposing an additional configuration parameter for the user. It can be added in the future if needed. So, summarizing I have the following pseudo-code in my head how implementation of {{IgniteSinkTask}} {{put}} and {{flush}} basing on {{IgniteDataStreamer}} should look like omitting details. {noformat} public class IgniteSinkTask extends SinkTask { /** Cache data streamer. */ IgniteDataStreamer ds; /** * Buffers records. * * @param records Records to inject into grid. */ @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record : records) { if (record.key() != null) // Data will be flushed asynchronously when IgniteDataStreamer.perNodeBufferSize is reached ds.addData(record.key(), record.value()); } } /** * Pushes buffered data to grid. Flush interval is configured by worker configurations. * * @param offsets Offset information. */ @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { // Flushing any pending data in the streamer. ds.flush(); } } {noformat} > Update KafkaStreamer to fit new features introduced in Kafka 0.9 > ---------------------------------------------------------------- > > Key: IGNITE-2016 > URL: https://issues.apache.org/jira/browse/IGNITE-2016 > Project: Ignite > Issue Type: New Feature > Components: streaming > Reporter: Roman Shtykh > Assignee: Roman Shtykh > > Particularly, > - new consumer > - Kafka Connect (Copycat) > http://www.confluent.io/blog/apache-kafka-0.9-is-released > This can be a a different integration task or a complete re-write of the > current implementation, considering the fact that Kafka Connect is a new > standard way for "large-scale, real-time data import and export for Kafka." -- This message was sent by Atlassian JIRA (v6.3.4#6332)