I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-of-this-quickstart
and in docker-machine ran /usr/bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo Running docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-wordcount-output --new-consumer --from-beginning shows 8 blank messages Is there any setting/configuration should be done as running the class in the docker-machine and running program outside the docker-machine does not return expected result! On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> wrote: > And the port for kafka is 29092 and for zookeeper 32181. > > On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> wrote: > >> Hi, >> >> I forgot to add in my previous email 2 questions. >> >> To setup my env, shall I use https://raw.githubusercont >> ent.com/confluentinc/cp-docker-images/master/examples/ >> kafka-single-node/docker-compose.yml instead or is there any other >> docker-compose.yml (version 2 or 3) which is suggested to setup env? >> >> How can I check "whether streams (that is just an app) can reach Kafka"? >> >> Regards, >> Mina >> >> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> >> wrote: >> >>> Hi Eno, >>> >>> Sorry! That is a typo! >>> >>> I have a docker-machine with different containers (setup as directed @ >>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html) >>> >>> docker ps --format "{{.Image}}: {{.Names}}" >>> >>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect >>> >>> confluentinc/cp-enterprise-control-center:3.2.0: control-center >>> >>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest >>> >>> confluentinc/cp-schema-registry:3.2.0: schema-registry >>> >>> confluentinc/cp-kafka:3.2.0: kafka >>> >>> confluentinc/cp-zookeeper:3.2.0: zookeeper >>> >>> I used example @ https://github.com/confluent >>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl >>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and >>> followed the same steps. >>> >>> When I run below command in docker-machine, I see the messages in >>> TextLinesTopic. >>> >>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 >>> kafka-console-consumer >>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer >>> --from-beginning >>> >>> hello kafka streams >>> >>> all streams lead to kafka >>> >>> join kafka summit >>> >>> test1 >>> >>> test2 >>> >>> test3 >>> >>> test4 >>> >>> Running above command for WordsWithCountsTopic returns nothing*.* >>> >>> My program runs out of docker machine, and it does not return any error. >>> >>> I checked kafka logs and kafka-connect logs, no information is shown. >>> Wondering what is the log level in kafka/kafka-connect. >>> >>> >>> Best regards, >>> Mina >>> >>> >>> >>> >>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>> >>>> Hi there, >>>> >>>> I noticed in your example that you are using localhost:9092 to produce >>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and >>>> the Kafka Streams app all running within one docker container, or in >>>> different containers? >>>> >>>> I just tested the WordCountLambdaExample and it works for me. This >>>> might not have anything to do with streams, but rather with the Kafka >>>> configuration and whether streams (that is just an app) can reach Kafka at >>>> all. If you provide the above information we can look further. >>>> >>>> >>>> >>>> Thanks >>>> Eno >>>> >>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote: >>>> > >>>> > I reset and still not working! >>>> > >>>> > My env is setup using >>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html >>>> > >>>> > I just tried using >>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st >>>> reams/src/main/java/io/confluent/examples/streams/WordCountL >>>> ambdaExample.java#L178-L181 >>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) >>>> created >>>> > from scratch as went through the steps as directed. >>>> > >>>> > When I stopped the java program and check the topics below are the >>>> data in >>>> > each topic. >>>> > >>>> > docker run \ >>>> > >>>> > --net=host \ >>>> > >>>> > --rm \ >>>> > >>>> > confluentinc/cp-kafka:3.2.0 \ >>>> > >>>> > kafka-console-consumer --bootstrap-server localhost:29092 --topic >>>> > TextLinesTopic --new-consumer --from-beginning >>>> > >>>> > >>>> > SHOWS >>>> > >>>> > hello kafka streams >>>> > >>>> > all streams lead to kafka >>>> > >>>> > join kafka summit >>>> > >>>> > test1 >>>> > >>>> > test2 >>>> > >>>> > test3 >>>> > >>>> > test4 >>>> > >>>> > FOR WordsWithCountsTopic nothing is shown >>>> > >>>> > >>>> > I am new to the Kafka/Kafka Stream and still do not understand why a >>>> simple >>>> > example does not work! >>>> > >>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax < >>>> matth...@confluent.io> >>>> > wrote: >>>> > >>>> >>>> So, when I check the number of messages in wordCount-input I see >>>> the >>>> >> same >>>> >>>> messages. However, when I run below code I do not see any >>>> message/data >>>> >> in >>>> >>>> wordCount-output. >>>> >> >>>> >> Did you reset your application? >>>> >> >>>> >> Each time you run you app and restart it, it will resume processing >>>> >> where it left off. Thus, if something went wrong in you first run but >>>> >> you got committed offsets, the app will not re-read the whole topic. >>>> >> >>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The >>>> >> application-id from StreamConfig is used a group.id. >>>> >> >>>> >> Thus, resetting you app would be required to consumer the input topic >>>> >> from scratch. Of you just write new data to you input topic. >>>> >> >>>> >>>> Can I connect to kafka in VM/docker container using below code or >>>> do I >>>> >> need >>>> >>>> to change/add other parameters? How can I submit the code to >>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >>>> the >>>> >>>> code(e.g. jar file)? >>>> >> >>>> >> A Streams app is a regular Java application and can run anywhere -- >>>> >> there is no notion of a processing cluster and you don't "submit" >>>> your >>>> >> code -- you just run your app. >>>> >> >>>> >> Thus, if your console consumer can connect to the cluster, your >>>> Streams >>>> >> app should also be able to connect to the cluster. >>>> >> >>>> >> >>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it >>>> >> seems log to process just a few records). But you might need to put >>>> >> startup delay into account. I would recommend to register a shutdown >>>> >> hook: see >>>> >> https://github.com/confluentinc/examples/blob/3. >>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ >>>> >> WordCountLambdaExample.java#L178-L181 >>>> >> >>>> >> >>>> >> Hope this helps. >>>> >> >>>> >> -Matthias >>>> >> >>>> >> >>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote: >>>> >>> Hi Matthias, >>>> >>> >>>> >>> Thank you for the quick response, appreciate it! >>>> >>> >>>> >>> I created the topics wordCount-input and wordCount-output. Pushed >>>> some >>>> >> data >>>> >>> to wordCount-input using >>>> >>> >>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format >>>> "{{.Names}}") >>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic >>>> >>> wordCount-input >>>> >>> >>>> >>> test >>>> >>> >>>> >>> new >>>> >>> >>>> >>> word >>>> >>> >>>> >>> count >>>> >>> >>>> >>> wordcount >>>> >>> >>>> >>> word count >>>> >>> >>>> >>> So, when I check the number of messages in wordCount-input I see >>>> the same >>>> >>> messages. However, when I run below code I do not see any >>>> message/data in >>>> >>> wordCount-output. >>>> >>> >>>> >>> Can I connect to kafka in VM/docker container using below code or >>>> do I >>>> >> need >>>> >>> to change/add other parameters? How can I submit the code to >>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >>>> the >>>> >>> code(e.g. jar file)? >>>> >>> >>>> >>> I really appreciate your input as I am blocked and cannot run even >>>> below >>>> >>> simple example. >>>> >>> >>>> >>> Best regards, >>>> >>> Mina >>>> >>> >>>> >>> I changed the code to be as below: >>>> >>> >>>> >>> Properties props = new Properties(); >>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>> "wordCount-streaming"); >>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>> "<ipAddress>:9092"); >>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>> >>> Serdes.String().getClass().getName()); >>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>> >>> Serdes.String().getClass().getName()); >>>> >>> >>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); >>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >>>> >>> >>>> >>> // setting offset reset to earliest so that we can re-run the demo >>>> >>> code with the same pre-loaded data >>>> >>> // Note: To re-run the demo, you need to use the offset reset tool: >>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/ >>>> >> Kafka+Streams+Application+Reset+Tool >>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>>> >>> >>>> >>> KStreamBuilder builder = new KStreamBuilder(); >>>> >>> >>>> >>> KStream<String, String> source = builder.stream("wordCount-input"); >>>> >>> >>>> >>> KTable<String, Long> counts = source >>>> >>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { >>>> >>> @Override >>>> >>> public Iterable<String> apply(String value) { >>>> >>> return >>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); >>>> >>> } >>>> >>> }).map(new KeyValueMapper<String, String, KeyValue<String, >>>> >> String>>() { >>>> >>> @Override >>>> >>> public KeyValue<String, String> apply(String key, String >>>> value) >>>> >> { >>>> >>> return new KeyValue<>(value, value); >>>> >>> } >>>> >>> }) >>>> >>> .groupByKey() >>>> >>> .count("Counts"); >>>> >>> >>>> >>> // need to override value serde to Long type >>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); >>>> >>> >>>> >>> KafkaStreams streams = new KafkaStreams(builder, props); >>>> >>> streams.start(); >>>> >>> >>>> >>> // usually the stream application would be running forever, >>>> >>> // in this example we just let it run for some time and stop since >>>> the >>>> >>> input data is finite. >>>> >>> Thread.sleep(5000L); >>>> >>> >>>> >>> streams.close(); >>>> >>> >>>> >>> >>>> >>> >>>> >>> >>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < >>>> matth...@confluent.io> >>>> >>> wrote: >>>> >>> >>>> >>>> Maybe you need to reset your application using the reset tool: >>>> >>>> http://docs.confluent.io/current/streams/developer- >>>> >>>> guide.html#application-reset-tool >>>> >>>> >>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you >>>> might >>>> >>>> only see data on commit. >>>> >>>> >>>> >>>> Try to reduce commit interval or disable caching by setting >>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. >>>> >>>> >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: >>>> >>>>> Hi, >>>> >>>>> >>>> >>>>> This is the first time that am using Kafka Stream. I would like >>>> to read >>>> >>>>> from input topic and write to output topic. However, I do not see >>>> the >>>> >>>> word >>>> >>>>> count when I try to run below example. Looks like that it does not >>>> >>>> connect >>>> >>>>> to Kafka. I do not see any error though. I tried my localhost >>>> kafka as >>>> >>>> well >>>> >>>>> as the container in a VM, same situation. >>>> >>>>> >>>> >>>>> There are over 200 message in the input kafka topic. >>>> >>>>> >>>> >>>>> Your input is appreciated! >>>> >>>>> >>>> >>>>> Best regards, >>>> >>>>> Mina >>>> >>>>> >>>> >>>>> import org.apache.kafka.common.serialization.*; >>>> >>>>> import org.apache.kafka.streams.*; >>>> >>>>> import org.apache.kafka.streams.kstream.*; >>>> >>>>> >>>> >>>>> import java.util.*; >>>> >>>>> import java.util.regex.*; >>>> >>>>> >>>> >>>>> public class WordCountExample { >>>> >>>>> >>>> >>>>> >>>> >>>>> public static void main(String [] args) { >>>> >>>>> final Properties streamsConfiguration = new Properties(); >>>> >>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG >>>> , >>>> >>>>> "wordcount-streaming"); >>>> >>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CON >>>> FIG, >>>> >>>>> "<IPADDRESS>:9092"); >>>> >>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFI >>>> G, >>>> >>>>> Serdes.String().getClass().getName()); >>>> >>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CON >>>> FIG, >>>> >>>>> Serdes.String().getClass().getName()); >>>> >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >>>> >> MS_CONFIG, >>>> >>>>> 10 * 1000); >>>> >>>>> >>>> >>>>> final Serde<String> stringSerde = Serdes.String(); >>>> >>>>> final Serde<Long> longSerde = Serdes.Long(); >>>> >>>>> >>>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>>> >>>>> >>>> >>>>> final KStream<String, String> textLines = >>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); >>>> >>>>> >>>> >>>>> final Pattern pattern = Pattern.compile("\\W+", >>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS); >>>> >>>>> >>>> >>>>> final KStream<String, Long> wordCounts = textLines >>>> >>>>> .flatMapValues(value -> >>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) >>>> >>>>> .groupBy((key, word) -> word) >>>> >>>>> .count("Counts") >>>> >>>>> .toStream(); >>>> >>>>> >>>> >>>>> >>>> >>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); >>>> >>>>> >>>> >>>>> final KafkaStreams streams = new KafkaStreams(builder, >>>> >>>>> streamsConfiguration); >>>> >>>>> streams.cleanUp(); >>>> >>>>> streams.start(); >>>> >>>>> >>>> >>>>> Runtime.getRuntime().addShutdownHook(new >>>> >>>> Thread(streams::close)); } >>>> >>>>> } >>>> >>>>> >>>> >>>> >>>> >>>> >>>> >>> >>>> >> >>>> >> >>>> >>>> >>> >> >