KafkaStreams works quite different than other systems like Flink/Storm.
It is not a system but a library.

If you start a KafkaStreams application, it runs locally.
Scaling/Parallelism comes into place if you start the same application
on multiple nodes. For this, Kafka's parallelization model is used under
the hood to distribute the workload automatically to each running
instance of the application. (see "Consumer Groups":
http://docs.confluent.io/2.1.0-alpha1/clients/consumer.html#concepts)


You might want to have a look here:
http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#parallelism-model


-Matthias

On 04/17/2016 01:29 PM, rss rss wrote:
> Ok... Is it really perform distribution of the field to several instances
> of a cluster? I expected to see some way via ProcessorContext...
> But may be this is result of the Flink's experience...
> 
> Thanks
> 
> 2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> 
>> I guess the simplest way would be to use a constructor parameter:
>>
>>> public static class CampaignProcessor implements
>> ProcessorSupplier<Windowed<String>, List<String[]>>
>>> {
>>>         private final String jedis_server;
>>>
>>>         public CampaignProcessor(String jedisServer) {
>>>             this.jedis_server = jedisServer;
>>>         }
>>>
>>>         @Override
>>>         public Processor<Windowed<String>, List<String[]>> get() {
>>>             return new Processor<Windowed<String>, List<String[]>>() {
>>>                 CampaignProcessorCommon campaignProcessorCommon;
>>>
>>>                 @Override
>>>                 public void init(ProcessorContext context) {
>>> //                    // looks like Flink code over here ;)
>>> //                    ParameterTool parameterTool = (ParameterTool)
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>> //                    parameterTool.getRequired("jedis_server");
>>> //                    LOG.info("Opening connection with Jedis to {}",
>> parameterTool.getRequired("jedis_server"));
>>>
>>> //                    this.campaignProcessorCommon = new
>> CampaignProcessorCommon(parameterTool.getRequired("jedis_server"));
>>>                     //this.campaignProcessorCommon = new
>> CampaignProcessorCommon("localhost");
>>>                     this.campaignProcessorCommon = new
>> CampaignProcessorCommon(jedis_server);
>>>                     this.campaignProcessorCommon.prepare();
>>>                 }
>>>
>>>                 @Override
>>>                 public void process(Windowed<String> key, List<String[]>
>> tupleList) {
>>>                     tupleList.forEach(x -> {
>>>                         String[] tuple = x;
>>>                         String campaign_id = tuple[0];
>>>                         String event_time = tuple[2];
>>>
>>  this.campaignProcessorCommon.execute(campaign_id, event_time);
>>>                     });
>>>                 }
>>>
>>>                 @Override
>>>                 public void punctuate(long timestamp) {}
>>>
>>>                 @Override
>>>                 public void close() {}
>>>             };
>>>         }
>>>     }
>>
>>
>>
>>
>>
>>
>> On 04/16/2016 09:35 PM, rss rss wrote:
>>> Hello,
>>>
>>>   I'm implementing yahoo streaming benchmark with Kafka streaming
>>> processing. How to pass a string into Processor or ValueTransformer
>>> instance? Accordingly to yahoo benchmark logic I need to transfer an
>>> address of Redis server.
>>>
>> https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212
>>>
>>> PS: the code above is runnable but not checked on real clusters.
>>>
>>> Best regards
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to