Hello, I think Kafka Streams is better treated as one approach in streaming processing systems for a variety of customers. For example, say if you already have a YARN cluster, and you have a dedicated team operating it and many teams wants to use this for their various streaming jobs, then submitting their jobs into a Samza framework running on YARN would make lots of sense; on the other hand, if you only have one or two complex streaming applications, and you do not have a YARN or Spark cluster set up already, then a lighter approach that each application developer code their logic using a Kafka Streams library would be better.
That being said, I am very interested in comparing Kafka Streams performance with Flink and Storm in Yahoo streaming benchmarks. If you are interested in doing so please let me know if there's anything I can help. Guozhang On Sun, Apr 17, 2016 at 11:41 AM, rss rss <rssde...@gmail.com> wrote: > Thanks for the answer. But is it correct in this case to use yahoo > streaming benchmark to compare Kafka, Flink and Storm? Or Kafka streaming > processor is for other category of customers? > > Best regards > > 2016-04-17 16:48 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > KafkaStreams works quite different than other systems like Flink/Storm. > > It is not a system but a library. > > > > If you start a KafkaStreams application, it runs locally. > > Scaling/Parallelism comes into place if you start the same application > > on multiple nodes. For this, Kafka's parallelization model is used under > > the hood to distribute the workload automatically to each running > > instance of the application. (see "Consumer Groups": > > http://docs.confluent.io/2.1.0-alpha1/clients/consumer.html#concepts) > > > > > > You might want to have a look here: > > > > > http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#parallelism-model > > > > > > -Matthias > > > > On 04/17/2016 01:29 PM, rss rss wrote: > > > Ok... Is it really perform distribution of the field to several > instances > > > of a cluster? I expected to see some way via ProcessorContext... > > > But may be this is result of the Flink's experience... > > > > > > Thanks > > > > > > 2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > > > >> I guess the simplest way would be to use a constructor parameter: > > >> > > >>> public static class CampaignProcessor implements > > >> ProcessorSupplier<Windowed<String>, List<String[]>> > > >>> { > > >>> private final String jedis_server; > > >>> > > >>> public CampaignProcessor(String jedisServer) { > > >>> this.jedis_server = jedisServer; > > >>> } > > >>> > > >>> @Override > > >>> public Processor<Windowed<String>, List<String[]>> get() { > > >>> return new Processor<Windowed<String>, List<String[]>>() > { > > >>> CampaignProcessorCommon campaignProcessorCommon; > > >>> > > >>> @Override > > >>> public void init(ProcessorContext context) { > > >>> // // looks like Flink code over here ;) > > >>> // ParameterTool parameterTool = (ParameterTool) > > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > > >>> // parameterTool.getRequired("jedis_server"); > > >>> // LOG.info("Opening connection with Jedis to {}", > > >> parameterTool.getRequired("jedis_server")); > > >>> > > >>> // this.campaignProcessorCommon = new > > >> CampaignProcessorCommon(parameterTool.getRequired("jedis_server")); > > >>> //this.campaignProcessorCommon = new > > >> CampaignProcessorCommon("localhost"); > > >>> this.campaignProcessorCommon = new > > >> CampaignProcessorCommon(jedis_server); > > >>> this.campaignProcessorCommon.prepare(); > > >>> } > > >>> > > >>> @Override > > >>> public void process(Windowed<String> key, > > List<String[]> > > >> tupleList) { > > >>> tupleList.forEach(x -> { > > >>> String[] tuple = x; > > >>> String campaign_id = tuple[0]; > > >>> String event_time = tuple[2]; > > >>> > > >> this.campaignProcessorCommon.execute(campaign_id, event_time); > > >>> }); > > >>> } > > >>> > > >>> @Override > > >>> public void punctuate(long timestamp) {} > > >>> > > >>> @Override > > >>> public void close() {} > > >>> }; > > >>> } > > >>> } > > >> > > >> > > >> > > >> > > >> > > >> > > >> On 04/16/2016 09:35 PM, rss rss wrote: > > >>> Hello, > > >>> > > >>> I'm implementing yahoo streaming benchmark with Kafka streaming > > >>> processing. How to pass a string into Processor or ValueTransformer > > >>> instance? Accordingly to yahoo benchmark logic I need to transfer an > > >>> address of Redis server. > > >>> > > >> > > > https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212 > > >>> > > >>> PS: the code above is runnable but not checked on real clusters. > > >>> > > >>> Best regards > > >>> > > >> > > >> > > > > > > > > -- -- Guozhang