Hello, thanks, I plan to carry on some tests. And I will support sources in actual state - https://github.com/rssdev10/streaming-benchmarks/tree/master/kafka-benchmarks. As the Kafka 10 is absent in apache mirrors and in maven central it is impossible to use default yahoo's start script. Workaround diff see below.
Regarding possibility to run Kafka streaming processing benchmark in several instances I think it is possible because of yahoo streaming benchmark does not use any complex aggregation. The only question is how to run it on several nodes. But really for me current implementation of Kafka streaming processor looks like https://projectreactor.io/docs/ .... @Matthias I'm a visiting researcher in DIMA TU-Berlin. Best regards, Roman Samarev ------------------------------------------------- --- a/stream-bench.sh +++ b/stream-bench.sh @@ -11,7 +11,7 @@ MVN=${MVN:-mvn} GIT=${GIT:-git} MAKE=${MAKE:-make} -KAFKA_VERSION=${KAFKA_VERSION:-"0.9.0.1"} +KAFKA_VERSION=${KAFKA_VERSION:-"0.10.0.0-SNAPSHOT"} REDIS_VERSION=${REDIS_VERSION:-"3.0.5"} SCALA_BIN_VERSION=${SCALA_BIN_VERSION:-"2.10"} SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"} @@ -146,8 +146,12 @@ run() { cd .. #Fetch Kafka - KAFKA_FILE="$KAFKA_DIR.tgz" - fetch_untar_file "$KAFKA_FILE" "$APACHE_MIRROR/kafka/$KAFKA_VERSION/$KAFKA_FILE" +# KAFKA_FILE="$KAFKA_DIR.tgz" +# fetch_untar_file "$KAFKA_FILE" "$APACHE_MIRROR/kafka/$KAFKA_VERSION/$KAFKA_FILE" + git clone https://github.com/apache/kafka.git -b 0.10.0 --depth=1 $KAFKA_DIR + cd $KAFKA_DIR + /opt/gradle/bin/gradle -PscalaVersion=$SCALA_BIN_VERSION installAll + cd .. 2016-04-18 7:08 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > Hello, > > I think Kafka Streams is better treated as one approach in streaming > processing systems for a variety of customers. For example, say if you > already have a YARN cluster, and you have a dedicated team operating it and > many teams wants to use this for their various streaming jobs, then > submitting their jobs into a Samza framework running on YARN would make > lots of sense; on the other hand, if you only have one or two complex > streaming applications, and you do not have a YARN or Spark cluster set up > already, then a lighter approach that each application developer code their > logic using a Kafka Streams library would be better. > > That being said, I am very interested in comparing Kafka Streams > performance with Flink and Storm in Yahoo streaming benchmarks. If you are > interested in doing so please let me know if there's anything I can help. > > Guozhang > > > On Sun, Apr 17, 2016 at 11:41 AM, rss rss <rssde...@gmail.com> wrote: > > > Thanks for the answer. But is it correct in this case to use yahoo > > streaming benchmark to compare Kafka, Flink and Storm? Or Kafka streaming > > processor is for other category of customers? > > > > Best regards > > > > 2016-04-17 16:48 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > > > KafkaStreams works quite different than other systems like Flink/Storm. > > > It is not a system but a library. > > > > > > If you start a KafkaStreams application, it runs locally. > > > Scaling/Parallelism comes into place if you start the same application > > > on multiple nodes. For this, Kafka's parallelization model is used > under > > > the hood to distribute the workload automatically to each running > > > instance of the application. (see "Consumer Groups": > > > http://docs.confluent.io/2.1.0-alpha1/clients/consumer.html#concepts) > > > > > > > > > You might want to have a look here: > > > > > > > > > http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#parallelism-model > > > > > > > > > -Matthias > > > > > > On 04/17/2016 01:29 PM, rss rss wrote: > > > > Ok... Is it really perform distribution of the field to several > > instances > > > > of a cluster? I expected to see some way via ProcessorContext... > > > > But may be this is result of the Flink's experience... > > > > > > > > Thanks > > > > > > > > 2016-04-17 12:38 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > > > > > >> I guess the simplest way would be to use a constructor parameter: > > > >> > > > >>> public static class CampaignProcessor implements > > > >> ProcessorSupplier<Windowed<String>, List<String[]>> > > > >>> { > > > >>> private final String jedis_server; > > > >>> > > > >>> public CampaignProcessor(String jedisServer) { > > > >>> this.jedis_server = jedisServer; > > > >>> } > > > >>> > > > >>> @Override > > > >>> public Processor<Windowed<String>, List<String[]>> get() { > > > >>> return new Processor<Windowed<String>, > List<String[]>>() > > { > > > >>> CampaignProcessorCommon campaignProcessorCommon; > > > >>> > > > >>> @Override > > > >>> public void init(ProcessorContext context) { > > > >>> // // looks like Flink code over here ;) > > > >>> // ParameterTool parameterTool = (ParameterTool) > > > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > > > >>> // parameterTool.getRequired("jedis_server"); > > > >>> // LOG.info("Opening connection with Jedis to > {}", > > > >> parameterTool.getRequired("jedis_server")); > > > >>> > > > >>> // this.campaignProcessorCommon = new > > > >> CampaignProcessorCommon(parameterTool.getRequired("jedis_server")); > > > >>> //this.campaignProcessorCommon = new > > > >> CampaignProcessorCommon("localhost"); > > > >>> this.campaignProcessorCommon = new > > > >> CampaignProcessorCommon(jedis_server); > > > >>> this.campaignProcessorCommon.prepare(); > > > >>> } > > > >>> > > > >>> @Override > > > >>> public void process(Windowed<String> key, > > > List<String[]> > > > >> tupleList) { > > > >>> tupleList.forEach(x -> { > > > >>> String[] tuple = x; > > > >>> String campaign_id = tuple[0]; > > > >>> String event_time = tuple[2]; > > > >>> > > > >> this.campaignProcessorCommon.execute(campaign_id, event_time); > > > >>> }); > > > >>> } > > > >>> > > > >>> @Override > > > >>> public void punctuate(long timestamp) {} > > > >>> > > > >>> @Override > > > >>> public void close() {} > > > >>> }; > > > >>> } > > > >>> } > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On 04/16/2016 09:35 PM, rss rss wrote: > > > >>> Hello, > > > >>> > > > >>> I'm implementing yahoo streaming benchmark with Kafka streaming > > > >>> processing. How to pass a string into Processor or ValueTransformer > > > >>> instance? Accordingly to yahoo benchmark logic I need to transfer > an > > > >>> address of Redis server. > > > >>> > > > >> > > > > > > https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212 > > > >>> > > > >>> PS: the code above is runnable but not checked on real clusters. > > > >>> > > > >>> Best regards > > > >>> > > > >> > > > >> > > > > > > > > > > > > > > > > -- > -- Guozhang >