Hello,

I think Kafka Streams is better treated as one approach in streaming
processing systems for a variety of customers. For example, say if you
already have a YARN cluster, and you have a dedicated team operating it and
many teams wants to use this for their various streaming jobs, then
submitting their jobs into a Samza framework running on YARN would make
lots of sense; on the other hand, if you only have one or two complex
streaming applications, and you do not have a YARN or Spark cluster set up
already, then a lighter approach that each application developer code their
logic using a Kafka Streams library would be better.

That being said, I am very interested in comparing Kafka Streams
performance with Flink and Storm in Yahoo streaming benchmarks. If you are
interested in doing so please let me know if there's anything I can help.

Guozhang


On Sun, Apr 17, 2016 at 11:41 AM, rss rss <rssde...@gmail.com> wrote:

> Thanks for the answer. But is it correct in this case to use yahoo
> streaming benchmark to compare Kafka, Flink and Storm? Or Kafka streaming
> processor is for other category of customers?
>
> Best regards
>
> 2016-04-17 16:48 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
>
> > KafkaStreams works quite different than other systems like Flink/Storm.
> > It is not a system but a library.
> >
> > If you start a KafkaStreams application, it runs locally.
> > Scaling/Parallelism comes into place if you start the same application
> > on multiple nodes. For this, Kafka's parallelization model is used under
> > the hood to distribute the workload automatically to each running
> > instance of the application. (see "Consumer Groups":
> > http://docs.confluent.io/2.1.0-alpha1/clients/consumer.html#concepts)
> >
> >
> > You might want to have a look here:
> >
> >
> http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#parallelism-model
> >
> >
> > -Matthias
> >
> > On 04/17/2016 01:29 PM, rss rss wrote:
> > > Ok... Is it really perform distribution of the field to several
> instances
> > > of a cluster? I expected to see some way via ProcessorContext...
> > > But may be this is result of the Flink's experience...
> > >
> > > Thanks
> > >
> > > 2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> > >
> > >> I guess the simplest way would be to use a constructor parameter:
> > >>
> > >>> public static class CampaignProcessor implements
> > >> ProcessorSupplier<Windowed<String>, List<String[]>>
> > >>> {
> > >>>         private final String jedis_server;
> > >>>
> > >>>         public CampaignProcessor(String jedisServer) {
> > >>>             this.jedis_server = jedisServer;
> > >>>         }
> > >>>
> > >>>         @Override
> > >>>         public Processor<Windowed<String>, List<String[]>> get() {
> > >>>             return new Processor<Windowed<String>, List<String[]>>()
> {
> > >>>                 CampaignProcessorCommon campaignProcessorCommon;
> > >>>
> > >>>                 @Override
> > >>>                 public void init(ProcessorContext context) {
> > >>> //                    // looks like Flink code over here ;)
> > >>> //                    ParameterTool parameterTool = (ParameterTool)
> > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> > >>> //                    parameterTool.getRequired("jedis_server");
> > >>> //                    LOG.info("Opening connection with Jedis to {}",
> > >> parameterTool.getRequired("jedis_server"));
> > >>>
> > >>> //                    this.campaignProcessorCommon = new
> > >> CampaignProcessorCommon(parameterTool.getRequired("jedis_server"));
> > >>>                     //this.campaignProcessorCommon = new
> > >> CampaignProcessorCommon("localhost");
> > >>>                     this.campaignProcessorCommon = new
> > >> CampaignProcessorCommon(jedis_server);
> > >>>                     this.campaignProcessorCommon.prepare();
> > >>>                 }
> > >>>
> > >>>                 @Override
> > >>>                 public void process(Windowed<String> key,
> > List<String[]>
> > >> tupleList) {
> > >>>                     tupleList.forEach(x -> {
> > >>>                         String[] tuple = x;
> > >>>                         String campaign_id = tuple[0];
> > >>>                         String event_time = tuple[2];
> > >>>
> > >>  this.campaignProcessorCommon.execute(campaign_id, event_time);
> > >>>                     });
> > >>>                 }
> > >>>
> > >>>                 @Override
> > >>>                 public void punctuate(long timestamp) {}
> > >>>
> > >>>                 @Override
> > >>>                 public void close() {}
> > >>>             };
> > >>>         }
> > >>>     }
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On 04/16/2016 09:35 PM, rss rss wrote:
> > >>> Hello,
> > >>>
> > >>>   I'm implementing yahoo streaming benchmark with Kafka streaming
> > >>> processing. How to pass a string into Processor or ValueTransformer
> > >>> instance? Accordingly to yahoo benchmark logic I need to transfer an
> > >>> address of Redis server.
> > >>>
> > >>
> >
> https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212
> > >>>
> > >>> PS: the code above is runnable but not checked on real clusters.
> > >>>
> > >>> Best regards
> > >>>
> > >>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to