[ 
https://issues.apache.org/jira/browse/IGNITE-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071825#comment-15071825
 ] 

Denis Magda commented on IGNITE-2016:
-------------------------------------

Roman,

Agree, there is no need to setup auto-flushing on IgniteDataStreamer size since 
Kafka will call {{IgniteDataStreamer.flush()}} periodically.
Moreover, IgniteDataStreamer's buffers will be flushed automatically when 
{{IgniteDataStreamer.perNodeBufferSize}} is reached meaning that the streamer 
doesn't need to wait until the flush is called and can push data to a cache 
asynchronously. Default value of {{IgniteDataStreamer.perNodeBufferSize}} is 
1024 and I think it's ok to leave it as is not exposing an additional 
configuration parameter for the user. It can be added in the future if needed.

So, summarizing I have the following pseudo-code in my head how implementation 
of {{IgniteSinkTask}} {{put}} and {{flush}} basing on {{IgniteDataStreamer}} 
should look like omitting details.

{noformat}
public class IgniteSinkTask extends SinkTask {
    /** Cache data streamer. */
    IgniteDataStreamer ds;

    /**
     * Buffers records.
     *
     * @param records Records to inject into grid.
     */
    @Override public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            if (record.key() != null)
                // Data will be flushed asynchronously when 
IgniteDataStreamer.perNodeBufferSize is reached
                ds.addData(record.key(), record.value());
        }
    }

    /**
     * Pushes buffered data to grid. Flush interval is configured by worker 
configurations.
     *
     * @param offsets Offset information.
     */
    @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) 
{
        // Flushing any pending data in the streamer.
        ds.flush();
    }
}
{noformat}

> Update KafkaStreamer to fit new features introduced in Kafka 0.9
> ----------------------------------------------------------------
>
>                 Key: IGNITE-2016
>                 URL: https://issues.apache.org/jira/browse/IGNITE-2016
>             Project: Ignite
>          Issue Type: New Feature
>          Components: streaming
>            Reporter: Roman Shtykh
>            Assignee: Roman Shtykh
>
> Particularly,
> - new consumer
> - Kafka Connect (Copycat)
> http://www.confluent.io/blog/apache-kafka-0.9-is-released
> This can be a a different integration task or a complete re-write of the 
> current implementation, considering the fact that Kafka Connect is a new 
> standard way for "large-scale, real-time data import and export for Kafka."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to