Hi Eno,

Great finding! You were right! I had to change KAFKA_ADVERTISED_LISTENERS
to be PLAINTEXT://$(docker-machine ip <docker-machine-name>):<kafka-port>
to make it work from IDE. Step 2 (pointing to
<docker-machine-name>:<kafka-port>
in my stream app) was already done.

Later, I'll try using CLI as mentioned here https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62 and
pointed out by Michael.

Thank you very much for your time and your prompt responses,
really appreciate it!

Have a wonderful day.

Best regards,
Mina

On Wed, Mar 15, 2017 at 4:38 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Mina,
>
> It might be that you need to set this property on the Kafka broker config
> file (server.properties):
> advertised.listeners=PLAINTEXT://your.host.name:9092 <plaintext://
> your.host.name:9092>
>
>
> The problem might be this: within docker you run Kafka and Kafka’s address
> is localhost:9092. Great. Then say you have another container or are
> running the streams app on your local laptop. If you point streams to
> localhost:9092 that is “not” where Kafka is running. So you need to point
> your streams app at the address of the container. That’s the second step.
> The first step is to have Kafka advertise that address to the streams app
> and that you do by setting the address above. Example:
>
> advertised.listeners=PLAINTEXT://123.45.67:9092
> <plaintext://123.45.67:9092>
>
> Then when you run the streams app you pass in 123.45.67:9092
> <plaintext://123.45.67:9092>.
>
> Thanks
> Eno
>
> > On Mar 15, 2017, at 5:14 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> >
> > Hi,
> > I just checked streams-wordcount-output topic using below command
> >
> > docker run \
> >
> >  --net=host \
> >
> >  --rm \
> >
> >  confluentinc/cp-kafka:3.2.0 \
> >
> >  kafka-console-consumer --bootstrap-server localhost:9092 \
> >
> >          --topic streams-wordcount-output \
> >
> >          --from-beginning \
> >
> >          --formatter kafka.tools.DefaultMessageFormatter \
> >
> >          --property print.key=true \
> >
> >          --property key.deserializer=org.apache.ka
> > fka.common.serialization.StringDeserializer \
> >
> >          --property value.deserializer=org.apache.
> > kafka.common.serialization.LongDeserializer
> >
> >
> > and it returns
> >
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
> >
> > Please note above result is when I tried  http://docs.confluent.i
> > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examp
> > les.wordcount.WordCountDemo.
> >
> > How come running same program out of docker-machine does not output to
> the
> > output topic?
> > Should I make the program as jar and deploy to docker-machine and run it
> > using ./bin/kafka-run-class?
> >
> > Best regards,
> > Mina
> >
> >
> >
> > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com>
> wrote:
> >
> >> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> >> of-this-quickstart
> >>
> >> and in docker-machine  ran /usr/bin/kafka-run-class
> >> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >>
> >> Running
> >>
> >> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> >> kafka-console-consumer --bootstrap-server localhost:9092 --topic
> >> streams-wordcount-output --new-consumer --from-beginning
> >>
> >> shows 8 blank messages
> >>
> >> Is there any setting/configuration should be done as running the class
> in
> >> the docker-machine and running program outside the docker-machine does
> not
> >> return expected result!
> >>
> >> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com>
> wrote:
> >>
> >>> And the port for kafka is 29092 and for zookeeper 32181.
> >>>
> >>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I forgot to add in my previous email 2 questions.
> >>>>
> >>>> To setup my env, shall I use https://raw.githubusercont
> >>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>>> single-node/docker-compose.yml instead or is there any other
> >>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>>
> >>>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>>
> >>>> Regards,
> >>>> Mina
> >>>>
> >>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Eno,
> >>>>>
> >>>>> Sorry! That is a typo!
> >>>>>
> >>>>> I have a docker-machine with different containers (setup as directed
> @
> >>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> )
> >>>>>
> >>>>> docker ps --format "{{.Image}}: {{.Names}}"
> >>>>>
> >>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> >>>>>
> >>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> >>>>>
> >>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> >>>>>
> >>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> >>>>>
> >>>>> confluentinc/cp-kafka:3.2.0: kafka
> >>>>>
> >>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> >>>>>
> >>>>> I used example @ https://github.com/confluent
> >>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> >>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> >>>>> followed the same steps.
> >>>>>
> >>>>> When I run below command in docker-machine, I see the messages in
> >>>>> TextLinesTopic.
> >>>>>
> >>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
> >>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
> >>>>> --from-beginning
> >>>>>
> >>>>> hello kafka streams
> >>>>>
> >>>>> all streams lead to kafka
> >>>>>
> >>>>> join kafka summit
> >>>>>
> >>>>> test1
> >>>>>
> >>>>> test2
> >>>>>
> >>>>> test3
> >>>>>
> >>>>> test4
> >>>>>
> >>>>> Running above command for WordsWithCountsTopic returns nothing*.*
> >>>>>
> >>>>> My program runs out of docker machine, and it does not return any
> >>>>> error.
> >>>>>
> >>>>> I checked kafka logs and kafka-connect logs, no information is shown.
> >>>>> Wondering what is the log level in kafka/kafka-connect.
> >>>>>
> >>>>>
> >>>>> Best regards,
> >>>>> Mina
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <
> eno.there...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi there,
> >>>>>>
> >>>>>> I noticed in your example that you are using localhost:9092 to
> produce
> >>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> kafka, and
> >>>>>> the Kafka Streams app all running within one docker container, or in
> >>>>>> different containers?
> >>>>>>
> >>>>>> I just tested the WordCountLambdaExample and it works for me. This
> >>>>>> might not have anything to do with streams, but rather with the
> Kafka
> >>>>>> configuration and whether streams (that is just an app) can reach
> Kafka at
> >>>>>> all. If you provide the above information we can look further.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Thanks
> >>>>>> Eno
> >>>>>>
> >>>>>>> On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com>
> wrote:
> >>>>>>>
> >>>>>>> I reset and still not working!
> >>>>>>>
> >>>>>>> My env is setup using
> >>>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> quickstart.html
> >>>>>>>
> >>>>>>> I just tried using
> >>>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> >>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> >>>>>> ambdaExample.java#L178-L181
> >>>>>>> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
> >>>>>> created
> >>>>>>> from scratch as went through the steps as directed.
> >>>>>>>
> >>>>>>> When I stopped the java program and check the topics below are the
> >>>>>> data in
> >>>>>>> each topic.
> >>>>>>>
> >>>>>>> docker run \
> >>>>>>>
> >>>>>>> --net=host \
> >>>>>>>
> >>>>>>> --rm \
> >>>>>>>
> >>>>>>> confluentinc/cp-kafka:3.2.0 \
> >>>>>>>
> >>>>>>> kafka-console-consumer --bootstrap-server localhost:29092 --topic
> >>>>>>> TextLinesTopic --new-consumer --from-beginning
> >>>>>>>
> >>>>>>>
> >>>>>>> SHOWS
> >>>>>>>
> >>>>>>> hello kafka streams
> >>>>>>>
> >>>>>>> all streams lead to kafka
> >>>>>>>
> >>>>>>> join kafka summit
> >>>>>>>
> >>>>>>> test1
> >>>>>>>
> >>>>>>> test2
> >>>>>>>
> >>>>>>> test3
> >>>>>>>
> >>>>>>> test4
> >>>>>>>
> >>>>>>> FOR WordsWithCountsTopic nothing is shown
> >>>>>>>
> >>>>>>>
> >>>>>>> I am new to the Kafka/Kafka Stream and still do not understand why
> a
> >>>>>> simple
> >>>>>>> example does not work!
> >>>>>>>
> >>>>>>> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> >>>>>> matth...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>>>> So, when I check the number of messages in wordCount-input I see
> >>>>>> the
> >>>>>>>> same
> >>>>>>>>>> messages. However, when I run below code I do not see any
> >>>>>> message/data
> >>>>>>>> in
> >>>>>>>>>> wordCount-output.
> >>>>>>>>
> >>>>>>>> Did you reset your application?
> >>>>>>>>
> >>>>>>>> Each time you run you app and restart it, it will resume
> processing
> >>>>>>>> where it left off. Thus, if something went wrong in you first run
> >>>>>> but
> >>>>>>>> you got committed offsets, the app will not re-read the whole
> topic.
> >>>>>>>>
> >>>>>>>> You can check committed offset via bin/kafka-consumer-groups.sh.
> The
> >>>>>>>> application-id from StreamConfig is used a group.id.
> >>>>>>>>
> >>>>>>>> Thus, resetting you app would be required to consumer the input
> >>>>>> topic
> >>>>>>>> from scratch. Of you just write new data to you input topic.
> >>>>>>>>
> >>>>>>>>>> Can I connect to kafka in VM/docker container using below code
> or
> >>>>>> do I
> >>>>>>>> need
> >>>>>>>>>> to change/add other parameters? How can I submit the code to
> >>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> >>>>>> submit the
> >>>>>>>>>> code(e.g. jar file)?
> >>>>>>>>
> >>>>>>>> A Streams app is a regular Java application and can run anywhere
> --
> >>>>>>>> there is no notion of a processing cluster and you don't "submit"
> >>>>>> your
> >>>>>>>> code -- you just run your app.
> >>>>>>>>
> >>>>>>>> Thus, if your console consumer can connect to the cluster, your
> >>>>>> Streams
> >>>>>>>> app should also be able to connect to the cluster.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Maybe, the short runtime of 5 seconds could be a problem (even if
> it
> >>>>>>>> seems log to process just a few records). But you might need to
> put
> >>>>>>>> startup delay into account. I would recommend to register a
> shutdown
> >>>>>>>> hook: see
> >>>>>>>> https://github.com/confluentinc/examples/blob/3.
> >>>>>>>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >>>>>>>> WordCountLambdaExample.java#L178-L181
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hope this helps.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> Thank you for the quick response, appreciate it!
> >>>>>>>>>
> >>>>>>>>> I created the topics wordCount-input and wordCount-output. Pushed
> >>>>>> some
> >>>>>>>> data
> >>>>>>>>> to wordCount-input using
> >>>>>>>>>
> >>>>>>>>> docker exec -it $(docker ps -f "name=kafka\\." --format
> >>>>>> "{{.Names}}")
> >>>>>>>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>>>>>>>> wordCount-input
> >>>>>>>>>
> >>>>>>>>> test
> >>>>>>>>>
> >>>>>>>>> new
> >>>>>>>>>
> >>>>>>>>> word
> >>>>>>>>>
> >>>>>>>>> count
> >>>>>>>>>
> >>>>>>>>> wordcount
> >>>>>>>>>
> >>>>>>>>> word count
> >>>>>>>>>
> >>>>>>>>> So, when I check the number of messages in wordCount-input I see
> >>>>>> the same
> >>>>>>>>> messages. However, when I run below code I do not see any
> >>>>>> message/data in
> >>>>>>>>> wordCount-output.
> >>>>>>>>>
> >>>>>>>>> Can I connect to kafka in VM/docker container using below code or
> >>>>>> do I
> >>>>>>>> need
> >>>>>>>>> to change/add other parameters? How can I submit the code to
> >>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> submit
> >>>>>> the
> >>>>>>>>> code(e.g. jar file)?
> >>>>>>>>>
> >>>>>>>>> I really appreciate your input as I am blocked and cannot run
> even
> >>>>>> below
> >>>>>>>>> simple example.
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>> Mina
> >>>>>>>>>
> >>>>>>>>> I changed the code to be as below:
> >>>>>>>>>
> >>>>>>>>> Properties props = new Properties();
> >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>>> "wordCount-streaming");
> >>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>>> "<ipAddress>:9092");
> >>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>
> >>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>>>>>>>
> >>>>>>>>> // setting offset reset to earliest so that we can re-run the
> demo
> >>>>>>>>> code with the same pre-loaded data
> >>>>>>>>> // Note: To re-run the demo, you need to use the offset reset
> tool:
> >>>>>>>>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>>>>> Kafka+Streams+Application+Reset+Tool
> >>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>>>>>>
> >>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
> >>>>>>>>>
> >>>>>>>>> KStream<String, String> source = builder.stream("wordCount-inpu
> >>>>>> t");
> >>>>>>>>>
> >>>>>>>>> KTable<String, Long> counts = source
> >>>>>>>>>     .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>>>>>>>        @Override
> >>>>>>>>>        public Iterable<String> apply(String value) {
> >>>>>>>>>           return
> >>>>>>>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> "));
> >>>>>>>>>        }
> >>>>>>>>>     }).map(new KeyValueMapper<String, String, KeyValue<String,
> >>>>>>>> String>>() {
> >>>>>>>>>        @Override
> >>>>>>>>>        public KeyValue<String, String> apply(String key, String
> >>>>>> value)
> >>>>>>>> {
> >>>>>>>>>           return new KeyValue<>(value, value);
> >>>>>>>>>        }
> >>>>>>>>>     })
> >>>>>>>>>     .groupByKey()
> >>>>>>>>>     .count("Counts");
> >>>>>>>>>
> >>>>>>>>> // need to override value serde to Long type
> >>>>>>>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>>>>>>>
> >>>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>>>>>>>> streams.start();
> >>>>>>>>>
> >>>>>>>>> // usually the stream application would be running forever,
> >>>>>>>>> // in this example we just let it run for some time and stop
> since
> >>>>>> the
> >>>>>>>>> input data is finite.
> >>>>>>>>> Thread.sleep(5000L);
> >>>>>>>>>
> >>>>>>>>> streams.close();
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> >>>>>> matth...@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Maybe you need to reset your application using the reset tool:
> >>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>> guide.html#application-reset-tool
> >>>>>>>>>>
> >>>>>>>>>> Also keep in mind, that KTables buffer internally, and thus, you
> >>>>>> might
> >>>>>>>>>> only see data on commit.
> >>>>>>>>>>
> >>>>>>>>>> Try to reduce commit interval or disable caching by setting
> >>>>>>>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> This is the first time that am using Kafka Stream. I would like
> >>>>>> to read
> >>>>>>>>>>> from input topic and write to output topic. However, I do not
> >>>>>> see the
> >>>>>>>>>> word
> >>>>>>>>>>> count when I try to run below example. Looks like that it does
> >>>>>> not
> >>>>>>>>>> connect
> >>>>>>>>>>> to Kafka. I do not see any error though. I tried my localhost
> >>>>>> kafka as
> >>>>>>>>>> well
> >>>>>>>>>>> as the container in a VM, same situation.
> >>>>>>>>>>>
> >>>>>>>>>>> There are over 200 message in the input kafka topic.
> >>>>>>>>>>>
> >>>>>>>>>>> Your input is appreciated!
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Mina
> >>>>>>>>>>>
> >>>>>>>>>>> import org.apache.kafka.common.serialization.*;
> >>>>>>>>>>> import org.apache.kafka.streams.*;
> >>>>>>>>>>> import org.apache.kafka.streams.kstream.*;
> >>>>>>>>>>>
> >>>>>>>>>>> import java.util.*;
> >>>>>>>>>>> import java.util.regex.*;
> >>>>>>>>>>>
> >>>>>>>>>>> public class WordCountExample {
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>  public static void main(String [] args)   {
> >>>>>>>>>>>     final Properties streamsConfiguration = new Properties();
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.APPLICATION_ID_CONFIG,
> >>>>>>>>>>> "wordcount-streaming");
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>>>>>>>> "<IPADDRESS>:9092");
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>>>     streamsConfiguration.put(Strea
> >>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>>>>>>>> Serdes.String().getClass().getName());
> >>>>>>>>>>>     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >>>>>>>> MS_CONFIG,
> >>>>>>>>>>> 10 * 1000);
> >>>>>>>>>>>
> >>>>>>>>>>>     final Serde<String> stringSerde = Serdes.String();
> >>>>>>>>>>>     final Serde<Long> longSerde = Serdes.Long();
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStreamBuilder builder = new KStreamBuilder();
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStream<String, String> textLines =
> >>>>>>>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>>>>>>>>
> >>>>>>>>>>>     final Pattern pattern = Pattern.compile("\\W+",
> >>>>>>>>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>>>>>>>>
> >>>>>>>>>>>     final KStream<String, Long> wordCounts = textLines
> >>>>>>>>>>>           .flatMapValues(value ->
> >>>>>>>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>>>>>>>>           .groupBy((key, word) -> word)
> >>>>>>>>>>>           .count("Counts")
> >>>>>>>>>>>           .toStream();
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>     wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>>>>>>>>
> >>>>>>>>>>>     final KafkaStreams streams = new KafkaStreams(builder,
> >>>>>>>>>>> streamsConfiguration);
> >>>>>>>>>>>     streams.cleanUp();
> >>>>>>>>>>>     streams.start();
> >>>>>>>>>>>
> >>>>>>>>>>>     Runtime.getRuntime().addShutdownHook(new
> >>>>>>>>>> Thread(streams::close));  }
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to