Hi, I just checked streams-wordcount-output topic using below command docker run \
--net=host \ --rm \ confluentinc/cp-kafka:3.2.0 \ kafka-console-consumer --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property key.deserializer=org.apache.ka fka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache. kafka.common.serialization.LongDeserializer and it returns all 1 lead 1 to 1 hello 1 streams 2 join 1 kafka 3 summit 1 Please note above result is when I tried http://docs.confluent.i o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in docker-machine ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp les.wordcount.WordCountDemo. How come running same program out of docker-machine does not output to the output topic? Should I make the program as jar and deploy to docker-machine and run it using ./bin/kafka-run-class? Best regards, Mina On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com> wrote: > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal- > of-this-quickstart > > and in docker-machine ran /usr/bin/kafka-run-class > org.apache.kafka.streams.examples.wordcount.WordCountDemo > > Running > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0 > kafka-console-consumer --bootstrap-server localhost:9092 --topic > streams-wordcount-output --new-consumer --from-beginning > > shows 8 blank messages > > Is there any setting/configuration should be done as running the class in > the docker-machine and running program outside the docker-machine does not > return expected result! > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> wrote: > >> And the port for kafka is 29092 and for zookeeper 32181. >> >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I forgot to add in my previous email 2 questions. >>> >>> To setup my env, shall I use https://raw.githubusercont >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka- >>> single-node/docker-compose.yml instead or is there any other >>> docker-compose.yml (version 2 or 3) which is suggested to setup env? >>> >>> How can I check "whether streams (that is just an app) can reach Kafka"? >>> >>> Regards, >>> Mina >>> >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> >>> wrote: >>> >>>> Hi Eno, >>>> >>>> Sorry! That is a typo! >>>> >>>> I have a docker-machine with different containers (setup as directed @ >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html) >>>> >>>> docker ps --format "{{.Image}}: {{.Names}}" >>>> >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect >>>> >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center >>>> >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest >>>> >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry >>>> >>>> confluentinc/cp-kafka:3.2.0: kafka >>>> >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper >>>> >>>> I used example @ https://github.com/confluent >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and >>>> followed the same steps. >>>> >>>> When I run below command in docker-machine, I see the messages in >>>> TextLinesTopic. >>>> >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 >>>> kafka-console-consumer >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer >>>> --from-beginning >>>> >>>> hello kafka streams >>>> >>>> all streams lead to kafka >>>> >>>> join kafka summit >>>> >>>> test1 >>>> >>>> test2 >>>> >>>> test3 >>>> >>>> test4 >>>> >>>> Running above command for WordsWithCountsTopic returns nothing*.* >>>> >>>> My program runs out of docker machine, and it does not return any >>>> error. >>>> >>>> I checked kafka logs and kafka-connect logs, no information is shown. >>>> Wondering what is the log level in kafka/kafka-connect. >>>> >>>> >>>> Best regards, >>>> Mina >>>> >>>> >>>> >>>> >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>> >>>>> Hi there, >>>>> >>>>> I noticed in your example that you are using localhost:9092 to produce >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, >>>>> and >>>>> the Kafka Streams app all running within one docker container, or in >>>>> different containers? >>>>> >>>>> I just tested the WordCountLambdaExample and it works for me. This >>>>> might not have anything to do with streams, but rather with the Kafka >>>>> configuration and whether streams (that is just an app) can reach Kafka at >>>>> all. If you provide the above information we can look further. >>>>> >>>>> >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote: >>>>> > >>>>> > I reset and still not working! >>>>> > >>>>> > My env is setup using >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html >>>>> > >>>>> > I just tried using >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL >>>>> ambdaExample.java#L178-L181 >>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) >>>>> created >>>>> > from scratch as went through the steps as directed. >>>>> > >>>>> > When I stopped the java program and check the topics below are the >>>>> data in >>>>> > each topic. >>>>> > >>>>> > docker run \ >>>>> > >>>>> > --net=host \ >>>>> > >>>>> > --rm \ >>>>> > >>>>> > confluentinc/cp-kafka:3.2.0 \ >>>>> > >>>>> > kafka-console-consumer --bootstrap-server localhost:29092 --topic >>>>> > TextLinesTopic --new-consumer --from-beginning >>>>> > >>>>> > >>>>> > SHOWS >>>>> > >>>>> > hello kafka streams >>>>> > >>>>> > all streams lead to kafka >>>>> > >>>>> > join kafka summit >>>>> > >>>>> > test1 >>>>> > >>>>> > test2 >>>>> > >>>>> > test3 >>>>> > >>>>> > test4 >>>>> > >>>>> > FOR WordsWithCountsTopic nothing is shown >>>>> > >>>>> > >>>>> > I am new to the Kafka/Kafka Stream and still do not understand why a >>>>> simple >>>>> > example does not work! >>>>> > >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax < >>>>> matth...@confluent.io> >>>>> > wrote: >>>>> > >>>>> >>>> So, when I check the number of messages in wordCount-input I see >>>>> the >>>>> >> same >>>>> >>>> messages. However, when I run below code I do not see any >>>>> message/data >>>>> >> in >>>>> >>>> wordCount-output. >>>>> >> >>>>> >> Did you reset your application? >>>>> >> >>>>> >> Each time you run you app and restart it, it will resume processing >>>>> >> where it left off. Thus, if something went wrong in you first run >>>>> but >>>>> >> you got committed offsets, the app will not re-read the whole topic. >>>>> >> >>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The >>>>> >> application-id from StreamConfig is used a group.id. >>>>> >> >>>>> >> Thus, resetting you app would be required to consumer the input >>>>> topic >>>>> >> from scratch. Of you just write new data to you input topic. >>>>> >> >>>>> >>>> Can I connect to kafka in VM/docker container using below code or >>>>> do I >>>>> >> need >>>>> >>>> to change/add other parameters? How can I submit the code to >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to >>>>> submit the >>>>> >>>> code(e.g. jar file)? >>>>> >> >>>>> >> A Streams app is a regular Java application and can run anywhere -- >>>>> >> there is no notion of a processing cluster and you don't "submit" >>>>> your >>>>> >> code -- you just run your app. >>>>> >> >>>>> >> Thus, if your console consumer can connect to the cluster, your >>>>> Streams >>>>> >> app should also be able to connect to the cluster. >>>>> >> >>>>> >> >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it >>>>> >> seems log to process just a few records). But you might need to put >>>>> >> startup delay into account. I would recommend to register a shutdown >>>>> >> hook: see >>>>> >> https://github.com/confluentinc/examples/blob/3. >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ >>>>> >> WordCountLambdaExample.java#L178-L181 >>>>> >> >>>>> >> >>>>> >> Hope this helps. >>>>> >> >>>>> >> -Matthias >>>>> >> >>>>> >> >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote: >>>>> >>> Hi Matthias, >>>>> >>> >>>>> >>> Thank you for the quick response, appreciate it! >>>>> >>> >>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed >>>>> some >>>>> >> data >>>>> >>> to wordCount-input using >>>>> >>> >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format >>>>> "{{.Names}}") >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic >>>>> >>> wordCount-input >>>>> >>> >>>>> >>> test >>>>> >>> >>>>> >>> new >>>>> >>> >>>>> >>> word >>>>> >>> >>>>> >>> count >>>>> >>> >>>>> >>> wordcount >>>>> >>> >>>>> >>> word count >>>>> >>> >>>>> >>> So, when I check the number of messages in wordCount-input I see >>>>> the same >>>>> >>> messages. However, when I run below code I do not see any >>>>> message/data in >>>>> >>> wordCount-output. >>>>> >>> >>>>> >>> Can I connect to kafka in VM/docker container using below code or >>>>> do I >>>>> >> need >>>>> >>> to change/add other parameters? How can I submit the code to >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >>>>> the >>>>> >>> code(e.g. jar file)? >>>>> >>> >>>>> >>> I really appreciate your input as I am blocked and cannot run even >>>>> below >>>>> >>> simple example. >>>>> >>> >>>>> >>> Best regards, >>>>> >>> Mina >>>>> >>> >>>>> >>> I changed the code to be as below: >>>>> >>> >>>>> >>> Properties props = new Properties(); >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>>> "wordCount-streaming"); >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>> "<ipAddress>:9092"); >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>>> >>> Serdes.String().getClass().getName()); >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>>> >>> Serdes.String().getClass().getName()); >>>>> >>> >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >>>>> >>> >>>>> >>> // setting offset reset to earliest so that we can re-run the demo >>>>> >>> code with the same pre-loaded data >>>>> >>> // Note: To re-run the demo, you need to use the offset reset tool: >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/ >>>>> >> Kafka+Streams+Application+Reset+Tool >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>>>> >>> >>>>> >>> KStreamBuilder builder = new KStreamBuilder(); >>>>> >>> >>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu >>>>> t"); >>>>> >>> >>>>> >>> KTable<String, Long> counts = source >>>>> >>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { >>>>> >>> @Override >>>>> >>> public Iterable<String> apply(String value) { >>>>> >>> return >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); >>>>> >>> } >>>>> >>> }).map(new KeyValueMapper<String, String, KeyValue<String, >>>>> >> String>>() { >>>>> >>> @Override >>>>> >>> public KeyValue<String, String> apply(String key, String >>>>> value) >>>>> >> { >>>>> >>> return new KeyValue<>(value, value); >>>>> >>> } >>>>> >>> }) >>>>> >>> .groupByKey() >>>>> >>> .count("Counts"); >>>>> >>> >>>>> >>> // need to override value serde to Long type >>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); >>>>> >>> >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props); >>>>> >>> streams.start(); >>>>> >>> >>>>> >>> // usually the stream application would be running forever, >>>>> >>> // in this example we just let it run for some time and stop since >>>>> the >>>>> >>> input data is finite. >>>>> >>> Thread.sleep(5000L); >>>>> >>> >>>>> >>> streams.close(); >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < >>>>> matth...@confluent.io> >>>>> >>> wrote: >>>>> >>> >>>>> >>>> Maybe you need to reset your application using the reset tool: >>>>> >>>> http://docs.confluent.io/current/streams/developer- >>>>> >>>> guide.html#application-reset-tool >>>>> >>>> >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you >>>>> might >>>>> >>>> only see data on commit. >>>>> >>>> >>>>> >>>> Try to reduce commit interval or disable caching by setting >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. >>>>> >>>> >>>>> >>>> >>>>> >>>> -Matthias >>>>> >>>> >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: >>>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> This is the first time that am using Kafka Stream. I would like >>>>> to read >>>>> >>>>> from input topic and write to output topic. However, I do not >>>>> see the >>>>> >>>> word >>>>> >>>>> count when I try to run below example. Looks like that it does >>>>> not >>>>> >>>> connect >>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost >>>>> kafka as >>>>> >>>> well >>>>> >>>>> as the container in a VM, same situation. >>>>> >>>>> >>>>> >>>>> There are over 200 message in the input kafka topic. >>>>> >>>>> >>>>> >>>>> Your input is appreciated! >>>>> >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> Mina >>>>> >>>>> >>>>> >>>>> import org.apache.kafka.common.serialization.*; >>>>> >>>>> import org.apache.kafka.streams.*; >>>>> >>>>> import org.apache.kafka.streams.kstream.*; >>>>> >>>>> >>>>> >>>>> import java.util.*; >>>>> >>>>> import java.util.regex.*; >>>>> >>>>> >>>>> >>>>> public class WordCountExample { >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> public static void main(String [] args) { >>>>> >>>>> final Properties streamsConfiguration = new Properties(); >>>>> >>>>> streamsConfiguration.put(Strea >>>>> msConfig.APPLICATION_ID_CONFIG, >>>>> >>>>> "wordcount-streaming"); >>>>> >>>>> streamsConfiguration.put(Strea >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>> >>>>> "<IPADDRESS>:9092"); >>>>> >>>>> streamsConfiguration.put(Strea >>>>> msConfig.KEY_SERDE_CLASS_CONFIG, >>>>> >>>>> Serdes.String().getClass().getName()); >>>>> >>>>> streamsConfiguration.put(Strea >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG, >>>>> >>>>> Serdes.String().getClass().getName()); >>>>> >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >>>>> >> MS_CONFIG, >>>>> >>>>> 10 * 1000); >>>>> >>>>> >>>>> >>>>> final Serde<String> stringSerde = Serdes.String(); >>>>> >>>>> final Serde<Long> longSerde = Serdes.Long(); >>>>> >>>>> >>>>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>>>> >>>>> >>>>> >>>>> final KStream<String, String> textLines = >>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); >>>>> >>>>> >>>>> >>>>> final Pattern pattern = Pattern.compile("\\W+", >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS); >>>>> >>>>> >>>>> >>>>> final KStream<String, Long> wordCounts = textLines >>>>> >>>>> .flatMapValues(value -> >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) >>>>> >>>>> .groupBy((key, word) -> word) >>>>> >>>>> .count("Counts") >>>>> >>>>> .toStream(); >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); >>>>> >>>>> >>>>> >>>>> final KafkaStreams streams = new KafkaStreams(builder, >>>>> >>>>> streamsConfiguration); >>>>> >>>>> streams.cleanUp(); >>>>> >>>>> streams.start(); >>>>> >>>>> >>>>> >>>>> Runtime.getRuntime().addShutdownHook(new >>>>> >>>> Thread(streams::close)); } >>>>> >>>>> } >>>>> >>>>> >>>>> >>>> >>>>> >>>> >>>>> >>> >>>>> >> >>>>> >> >>>>> >>>>> >>>> >>> >> >