Hi Eno, Great finding! You were right! I had to change KAFKA_ADVERTISED_LISTENERS to be PLAINTEXT://$(docker-machine ip <docker-machine-name>):<kafka-port> to make it work from IDE. Step 2 (pointing to <docker-machine-name>:<kafka-port> in my stream app) was already done.
Later, I'll try using CLI as mentioned here https://github.com/ confluentinc/examples/blob/3.2.x/kafka-streams/src/main/ java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62 and pointed out by Michael. Thank you very much for your time and your prompt responses, really appreciate it! Have a wonderful day. Best regards, Mina On Wed, Mar 15, 2017 at 4:38 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Mina, > > It might be that you need to set this property on the Kafka broker config > file (server.properties): > advertised.listeners=PLAINTEXT://your.host.name:9092 <plaintext:// > your.host.name:9092> > > > The problem might be this: within docker you run Kafka and Kafka’s address > is localhost:9092. Great. Then say you have another container or are > running the streams app on your local laptop. If you point streams to > localhost:9092 that is “not” where Kafka is running. So you need to point > your streams app at the address of the container. That’s the second step. > The first step is to have Kafka advertise that address to the streams app > and that you do by setting the address above. Example: > > advertised.listeners=PLAINTEXT://123.45.67:9092 > <plaintext://123.45.67:9092> > > Then when you run the streams app you pass in 123.45.67:9092 > <plaintext://123.45.67:9092>. > > Thanks > Eno > > > On Mar 15, 2017, at 5:14 AM, Mina Aslani <aslanim...@gmail.com> wrote: > > > > Hi, > > I just checked streams-wordcount-output topic using below command > > > > docker run \ > > > > --net=host \ > > > > --rm \ > > > > confluentinc/cp-kafka:3.2.0 \ > > > > kafka-console-consumer --bootstrap-server localhost:9092 \ > > > > --topic streams-wordcount-output \ > > > > --from-beginning \ > > > > --formatter kafka.tools.DefaultMessageFormatter \ > > > > --property print.key=true \ > > > > --property key.deserializer=org.apache.ka > > fka.common.serialization.StringDeserializer \ > > > > --property value.deserializer=org.apache. > > kafka.common.serialization.LongDeserializer > > > > > > and it returns > > > > all 1 > > lead 1 > > to 1 > > hello 1 > > streams 2 > > join 1 > > kafka 3 > > summit 1 > > > > Please note above result is when I tried http://docs.confluent.i > > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in > > docker-machine ran /usr/bin/kafka-run-class > org.apache.kafka.streams.examp > > les.wordcount.WordCountDemo. > > > > How come running same program out of docker-machine does not output to > the > > output topic? > > Should I make the program as jar and deploy to docker-machine and run it > > using ./bin/kafka-run-class? > > > > Best regards, > > Mina > > > > > > > > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com> > wrote: > > > >> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html# > goal- > >> of-this-quickstart > >> > >> and in docker-machine ran /usr/bin/kafka-run-class > >> org.apache.kafka.streams.examples.wordcount.WordCountDemo > >> > >> Running > >> > >> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 > >> kafka-console-consumer --bootstrap-server localhost:9092 --topic > >> streams-wordcount-output --new-consumer --from-beginning > >> > >> shows 8 blank messages > >> > >> Is there any setting/configuration should be done as running the class > in > >> the docker-machine and running program outside the docker-machine does > not > >> return expected result! > >> > >> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> > wrote: > >> > >>> And the port for kafka is 29092 and for zookeeper 32181. > >>> > >>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> I forgot to add in my previous email 2 questions. > >>>> > >>>> To setup my env, shall I use https://raw.githubusercont > >>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka- > >>>> single-node/docker-compose.yml instead or is there any other > >>>> docker-compose.yml (version 2 or 3) which is suggested to setup env? > >>>> > >>>> How can I check "whether streams (that is just an app) can reach > Kafka"? > >>>> > >>>> Regards, > >>>> Mina > >>>> > >>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi Eno, > >>>>> > >>>>> Sorry! That is a typo! > >>>>> > >>>>> I have a docker-machine with different containers (setup as directed > @ > >>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html > ) > >>>>> > >>>>> docker ps --format "{{.Image}}: {{.Names}}" > >>>>> > >>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect > >>>>> > >>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center > >>>>> > >>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest > >>>>> > >>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry > >>>>> > >>>>> confluentinc/cp-kafka:3.2.0: kafka > >>>>> > >>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper > >>>>> > >>>>> I used example @ https://github.com/confluent > >>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl > >>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and > >>>>> followed the same steps. > >>>>> > >>>>> When I run below command in docker-machine, I see the messages in > >>>>> TextLinesTopic. > >>>>> > >>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 > kafka-console-consumer > >>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic > --new-consumer > >>>>> --from-beginning > >>>>> > >>>>> hello kafka streams > >>>>> > >>>>> all streams lead to kafka > >>>>> > >>>>> join kafka summit > >>>>> > >>>>> test1 > >>>>> > >>>>> test2 > >>>>> > >>>>> test3 > >>>>> > >>>>> test4 > >>>>> > >>>>> Running above command for WordsWithCountsTopic returns nothing*.* > >>>>> > >>>>> My program runs out of docker machine, and it does not return any > >>>>> error. > >>>>> > >>>>> I checked kafka logs and kafka-connect logs, no information is shown. > >>>>> Wondering what is the log level in kafka/kafka-connect. > >>>>> > >>>>> > >>>>> Best regards, > >>>>> Mina > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska < > eno.there...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi there, > >>>>>> > >>>>>> I noticed in your example that you are using localhost:9092 to > produce > >>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, > kafka, and > >>>>>> the Kafka Streams app all running within one docker container, or in > >>>>>> different containers? > >>>>>> > >>>>>> I just tested the WordCountLambdaExample and it works for me. This > >>>>>> might not have anything to do with streams, but rather with the > Kafka > >>>>>> configuration and whether streams (that is just an app) can reach > Kafka at > >>>>>> all. If you provide the above information we can look further. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Thanks > >>>>>> Eno > >>>>>> > >>>>>>> On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> > wrote: > >>>>>>> > >>>>>>> I reset and still not working! > >>>>>>> > >>>>>>> My env is setup using > >>>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/ > quickstart.html > >>>>>>> > >>>>>>> I just tried using > >>>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-st > >>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL > >>>>>> ambdaExample.java#L178-L181 > >>>>>>> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) > >>>>>> created > >>>>>>> from scratch as went through the steps as directed. > >>>>>>> > >>>>>>> When I stopped the java program and check the topics below are the > >>>>>> data in > >>>>>>> each topic. > >>>>>>> > >>>>>>> docker run \ > >>>>>>> > >>>>>>> --net=host \ > >>>>>>> > >>>>>>> --rm \ > >>>>>>> > >>>>>>> confluentinc/cp-kafka:3.2.0 \ > >>>>>>> > >>>>>>> kafka-console-consumer --bootstrap-server localhost:29092 --topic > >>>>>>> TextLinesTopic --new-consumer --from-beginning > >>>>>>> > >>>>>>> > >>>>>>> SHOWS > >>>>>>> > >>>>>>> hello kafka streams > >>>>>>> > >>>>>>> all streams lead to kafka > >>>>>>> > >>>>>>> join kafka summit > >>>>>>> > >>>>>>> test1 > >>>>>>> > >>>>>>> test2 > >>>>>>> > >>>>>>> test3 > >>>>>>> > >>>>>>> test4 > >>>>>>> > >>>>>>> FOR WordsWithCountsTopic nothing is shown > >>>>>>> > >>>>>>> > >>>>>>> I am new to the Kafka/Kafka Stream and still do not understand why > a > >>>>>> simple > >>>>>>> example does not work! > >>>>>>> > >>>>>>> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax < > >>>>>> matth...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>>>> So, when I check the number of messages in wordCount-input I see > >>>>>> the > >>>>>>>> same > >>>>>>>>>> messages. However, when I run below code I do not see any > >>>>>> message/data > >>>>>>>> in > >>>>>>>>>> wordCount-output. > >>>>>>>> > >>>>>>>> Did you reset your application? > >>>>>>>> > >>>>>>>> Each time you run you app and restart it, it will resume > processing > >>>>>>>> where it left off. Thus, if something went wrong in you first run > >>>>>> but > >>>>>>>> you got committed offsets, the app will not re-read the whole > topic. > >>>>>>>> > >>>>>>>> You can check committed offset via bin/kafka-consumer-groups.sh. > The > >>>>>>>> application-id from StreamConfig is used a group.id. > >>>>>>>> > >>>>>>>> Thus, resetting you app would be required to consumer the input > >>>>>> topic > >>>>>>>> from scratch. Of you just write new data to you input topic. > >>>>>>>> > >>>>>>>>>> Can I connect to kafka in VM/docker container using below code > or > >>>>>> do I > >>>>>>>> need > >>>>>>>>>> to change/add other parameters? How can I submit the code to > >>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to > >>>>>> submit the > >>>>>>>>>> code(e.g. jar file)? > >>>>>>>> > >>>>>>>> A Streams app is a regular Java application and can run anywhere > -- > >>>>>>>> there is no notion of a processing cluster and you don't "submit" > >>>>>> your > >>>>>>>> code -- you just run your app. > >>>>>>>> > >>>>>>>> Thus, if your console consumer can connect to the cluster, your > >>>>>> Streams > >>>>>>>> app should also be able to connect to the cluster. > >>>>>>>> > >>>>>>>> > >>>>>>>> Maybe, the short runtime of 5 seconds could be a problem (even if > it > >>>>>>>> seems log to process just a few records). But you might need to > put > >>>>>>>> startup delay into account. I would recommend to register a > shutdown > >>>>>>>> hook: see > >>>>>>>> https://github.com/confluentinc/examples/blob/3. > >>>>>>>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > >>>>>>>> WordCountLambdaExample.java#L178-L181 > >>>>>>>> > >>>>>>>> > >>>>>>>> Hope this helps. > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 3/13/17 7:30 PM, Mina Aslani wrote: > >>>>>>>>> Hi Matthias, > >>>>>>>>> > >>>>>>>>> Thank you for the quick response, appreciate it! > >>>>>>>>> > >>>>>>>>> I created the topics wordCount-input and wordCount-output. Pushed > >>>>>> some > >>>>>>>> data > >>>>>>>>> to wordCount-input using > >>>>>>>>> > >>>>>>>>> docker exec -it $(docker ps -f "name=kafka\\." --format > >>>>>> "{{.Names}}") > >>>>>>>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic > >>>>>>>>> wordCount-input > >>>>>>>>> > >>>>>>>>> test > >>>>>>>>> > >>>>>>>>> new > >>>>>>>>> > >>>>>>>>> word > >>>>>>>>> > >>>>>>>>> count > >>>>>>>>> > >>>>>>>>> wordcount > >>>>>>>>> > >>>>>>>>> word count > >>>>>>>>> > >>>>>>>>> So, when I check the number of messages in wordCount-input I see > >>>>>> the same > >>>>>>>>> messages. However, when I run below code I do not see any > >>>>>> message/data in > >>>>>>>>> wordCount-output. > >>>>>>>>> > >>>>>>>>> Can I connect to kafka in VM/docker container using below code or > >>>>>> do I > >>>>>>>> need > >>>>>>>>> to change/add other parameters? How can I submit the code to > >>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to > submit > >>>>>> the > >>>>>>>>> code(e.g. jar file)? > >>>>>>>>> > >>>>>>>>> I really appreciate your input as I am blocked and cannot run > even > >>>>>> below > >>>>>>>>> simple example. > >>>>>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> Mina > >>>>>>>>> > >>>>>>>>> I changed the code to be as below: > >>>>>>>>> > >>>>>>>>> Properties props = new Properties(); > >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>>>>> "wordCount-streaming"); > >>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > >>>>>> "<ipAddress>:9092"); > >>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>>>>>>>> Serdes.String().getClass().getName()); > >>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>>>>>>>> Serdes.String().getClass().getName()); > >>>>>>>>> > >>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); > >>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > >>>>>>>>> > >>>>>>>>> // setting offset reset to earliest so that we can re-run the > demo > >>>>>>>>> code with the same pre-loaded data > >>>>>>>>> // Note: To re-run the demo, you need to use the offset reset > tool: > >>>>>>>>> // https://cwiki.apache.org/confluence/display/KAFKA/ > >>>>>>>> Kafka+Streams+Application+Reset+Tool > >>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > >>>>>>>>> > >>>>>>>>> KStreamBuilder builder = new KStreamBuilder(); > >>>>>>>>> > >>>>>>>>> KStream<String, String> source = builder.stream("wordCount-inpu > >>>>>> t"); > >>>>>>>>> > >>>>>>>>> KTable<String, Long> counts = source > >>>>>>>>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { > >>>>>>>>> @Override > >>>>>>>>> public Iterable<String> apply(String value) { > >>>>>>>>> return > >>>>>>>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" > ")); > >>>>>>>>> } > >>>>>>>>> }).map(new KeyValueMapper<String, String, KeyValue<String, > >>>>>>>> String>>() { > >>>>>>>>> @Override > >>>>>>>>> public KeyValue<String, String> apply(String key, String > >>>>>> value) > >>>>>>>> { > >>>>>>>>> return new KeyValue<>(value, value); > >>>>>>>>> } > >>>>>>>>> }) > >>>>>>>>> .groupByKey() > >>>>>>>>> .count("Counts"); > >>>>>>>>> > >>>>>>>>> // need to override value serde to Long type > >>>>>>>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); > >>>>>>>>> > >>>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props); > >>>>>>>>> streams.start(); > >>>>>>>>> > >>>>>>>>> // usually the stream application would be running forever, > >>>>>>>>> // in this example we just let it run for some time and stop > since > >>>>>> the > >>>>>>>>> input data is finite. > >>>>>>>>> Thread.sleep(5000L); > >>>>>>>>> > >>>>>>>>> streams.close(); > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < > >>>>>> matth...@confluent.io> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Maybe you need to reset your application using the reset tool: > >>>>>>>>>> http://docs.confluent.io/current/streams/developer- > >>>>>>>>>> guide.html#application-reset-tool > >>>>>>>>>> > >>>>>>>>>> Also keep in mind, that KTables buffer internally, and thus, you > >>>>>> might > >>>>>>>>>> only see data on commit. > >>>>>>>>>> > >>>>>>>>>> Try to reduce commit interval or disable caching by setting > >>>>>>>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> On 3/13/17 12:29 PM, Mina Aslani wrote: > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> This is the first time that am using Kafka Stream. I would like > >>>>>> to read > >>>>>>>>>>> from input topic and write to output topic. However, I do not > >>>>>> see the > >>>>>>>>>> word > >>>>>>>>>>> count when I try to run below example. Looks like that it does > >>>>>> not > >>>>>>>>>> connect > >>>>>>>>>>> to Kafka. I do not see any error though. I tried my localhost > >>>>>> kafka as > >>>>>>>>>> well > >>>>>>>>>>> as the container in a VM, same situation. > >>>>>>>>>>> > >>>>>>>>>>> There are over 200 message in the input kafka topic. > >>>>>>>>>>> > >>>>>>>>>>> Your input is appreciated! > >>>>>>>>>>> > >>>>>>>>>>> Best regards, > >>>>>>>>>>> Mina > >>>>>>>>>>> > >>>>>>>>>>> import org.apache.kafka.common.serialization.*; > >>>>>>>>>>> import org.apache.kafka.streams.*; > >>>>>>>>>>> import org.apache.kafka.streams.kstream.*; > >>>>>>>>>>> > >>>>>>>>>>> import java.util.*; > >>>>>>>>>>> import java.util.regex.*; > >>>>>>>>>>> > >>>>>>>>>>> public class WordCountExample { > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> public static void main(String [] args) { > >>>>>>>>>>> final Properties streamsConfiguration = new Properties(); > >>>>>>>>>>> streamsConfiguration.put(Strea > >>>>>> msConfig.APPLICATION_ID_CONFIG, > >>>>>>>>>>> "wordcount-streaming"); > >>>>>>>>>>> streamsConfiguration.put(Strea > >>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG, > >>>>>>>>>>> "<IPADDRESS>:9092"); > >>>>>>>>>>> streamsConfiguration.put(Strea > >>>>>> msConfig.KEY_SERDE_CLASS_CONFIG, > >>>>>>>>>>> Serdes.String().getClass().getName()); > >>>>>>>>>>> streamsConfiguration.put(Strea > >>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG, > >>>>>>>>>>> Serdes.String().getClass().getName()); > >>>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > >>>>>>>> MS_CONFIG, > >>>>>>>>>>> 10 * 1000); > >>>>>>>>>>> > >>>>>>>>>>> final Serde<String> stringSerde = Serdes.String(); > >>>>>>>>>>> final Serde<Long> longSerde = Serdes.Long(); > >>>>>>>>>>> > >>>>>>>>>>> final KStreamBuilder builder = new KStreamBuilder(); > >>>>>>>>>>> > >>>>>>>>>>> final KStream<String, String> textLines = > >>>>>>>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); > >>>>>>>>>>> > >>>>>>>>>>> final Pattern pattern = Pattern.compile("\\W+", > >>>>>>>>>>> Pattern.UNICODE_CHARACTER_CLASS); > >>>>>>>>>>> > >>>>>>>>>>> final KStream<String, Long> wordCounts = textLines > >>>>>>>>>>> .flatMapValues(value -> > >>>>>>>>>>> Arrays.asList(pattern.split(value.toLowerCase()))) > >>>>>>>>>>> .groupBy((key, word) -> word) > >>>>>>>>>>> .count("Counts") > >>>>>>>>>>> .toStream(); > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); > >>>>>>>>>>> > >>>>>>>>>>> final KafkaStreams streams = new KafkaStreams(builder, > >>>>>>>>>>> streamsConfiguration); > >>>>>>>>>>> streams.cleanUp(); > >>>>>>>>>>> streams.start(); > >>>>>>>>>>> > >>>>>>>>>>> Runtime.getRuntime().addShutdownHook(new > >>>>>>>>>> Thread(streams::close)); } > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >