Hi,
I just checked streams-wordcount-output topic using below command

docker run \

  --net=host \

  --rm \

  confluentinc/cp-kafka:3.2.0 \

  kafka-console-consumer --bootstrap-server localhost:9092 \

          --topic streams-wordcount-output \

          --from-beginning \

          --formatter kafka.tools.DefaultMessageFormatter \

          --property print.key=true \

          --property key.deserializer=org.apache.ka
fka.common.serialization.StringDeserializer \

          --property value.deserializer=org.apache.
kafka.common.serialization.LongDeserializer


and it returns

all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

Please note above result is when I tried  http://docs.confluent.i
o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
les.wordcount.WordCountDemo.

How come running same program out of docker-machine does not output to the
output topic?
Should I make the program as jar and deploy to docker-machine and run it
using ./bin/kafka-run-class?

Best regards,
Mina



On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-
> of-this-quickstart
>
> and in docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
>
> Running
>
> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer --bootstrap-server localhost:9092 --topic
> streams-wordcount-output --new-consumer --from-beginning
>
> shows 8 blank messages
>
> Is there any setting/configuration should be done as running the class in
> the docker-machine and running program outside the docker-machine does not
> return expected result!
>
> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> wrote:
>
>> And the port for kafka is 29092 and for zookeeper 32181.
>>
>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I forgot to add in my previous email 2 questions.
>>>
>>> To setup my env, shall I use https://raw.githubusercont
>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
>>> single-node/docker-compose.yml instead or is there any other
>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>>
>>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>>
>>> Regards,
>>> Mina
>>>
>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com>
>>> wrote:
>>>
>>>> Hi Eno,
>>>>
>>>> Sorry! That is a typo!
>>>>
>>>> I have a docker-machine with different containers (setup as directed @
>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>>
>>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>>
>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>>
>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>>
>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>>
>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>>
>>>> confluentinc/cp-kafka:3.2.0: kafka
>>>>
>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>>
>>>> I used example @ https://github.com/confluent
>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>>> followed the same steps.
>>>>
>>>> When I run below command in docker-machine, I see the messages in
>>>> TextLinesTopic.
>>>>
>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 
>>>> kafka-console-consumer
>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>>> --from-beginning
>>>>
>>>> hello kafka streams
>>>>
>>>> all streams lead to kafka
>>>>
>>>> join kafka summit
>>>>
>>>> test1
>>>>
>>>> test2
>>>>
>>>> test3
>>>>
>>>> test4
>>>>
>>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>>
>>>> My program runs out of docker machine, and it does not return any
>>>> error.
>>>>
>>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>>> Wondering what is the log level in kafka/kafka-connect.
>>>>
>>>>
>>>> Best regards,
>>>> Mina
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I noticed in your example that you are using localhost:9092 to produce
>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, 
>>>>> and
>>>>> the Kafka Streams app all running within one docker container, or in
>>>>> different containers?
>>>>>
>>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>>> might not have anything to do with streams, but rather with the Kafka
>>>>> configuration and whether streams (that is just an app) can reach Kafka at
>>>>> all. If you provide the above information we can look further.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>> Eno
>>>>>
>>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
>>>>> >
>>>>> > I reset and still not working!
>>>>> >
>>>>> > My env is setup using
>>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>>> >
>>>>> > I just tried using
>>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>>> ambdaExample.java#L178-L181
>>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>>> created
>>>>> > from scratch as went through the steps as directed.
>>>>> >
>>>>> > When I stopped the java program and check the topics below are the
>>>>> data in
>>>>> > each topic.
>>>>> >
>>>>> > docker run \
>>>>> >
>>>>> >  --net=host \
>>>>> >
>>>>> >  --rm \
>>>>> >
>>>>> >  confluentinc/cp-kafka:3.2.0 \
>>>>> >
>>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>>> > TextLinesTopic --new-consumer --from-beginning
>>>>> >
>>>>> >
>>>>> > SHOWS
>>>>> >
>>>>> > hello kafka streams
>>>>> >
>>>>> > all streams lead to kafka
>>>>> >
>>>>> > join kafka summit
>>>>> >
>>>>> > test1
>>>>> >
>>>>> > test2
>>>>> >
>>>>> > test3
>>>>> >
>>>>> > test4
>>>>> >
>>>>> > FOR WordsWithCountsTopic nothing is shown
>>>>> >
>>>>> >
>>>>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>>>>> simple
>>>>> > example does not work!
>>>>> >
>>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>> > wrote:
>>>>> >
>>>>> >>>> So, when I check the number of messages in wordCount-input I see
>>>>> the
>>>>> >> same
>>>>> >>>> messages. However, when I run below code I do not see any
>>>>> message/data
>>>>> >> in
>>>>> >>>> wordCount-output.
>>>>> >>
>>>>> >> Did you reset your application?
>>>>> >>
>>>>> >> Each time you run you app and restart it, it will resume processing
>>>>> >> where it left off. Thus, if something went wrong in you first run
>>>>> but
>>>>> >> you got committed offsets, the app will not re-read the whole topic.
>>>>> >>
>>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>>> >> application-id from StreamConfig is used a group.id.
>>>>> >>
>>>>> >> Thus, resetting you app would be required to consumer the input
>>>>> topic
>>>>> >> from scratch. Of you just write new data to you input topic.
>>>>> >>
>>>>> >>>> Can I connect to kafka in VM/docker container using below code or
>>>>> do I
>>>>> >> need
>>>>> >>>> to change/add other parameters? How can I submit the code to
>>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
>>>>> submit the
>>>>> >>>> code(e.g. jar file)?
>>>>> >>
>>>>> >> A Streams app is a regular Java application and can run anywhere --
>>>>> >> there is no notion of a processing cluster and you don't "submit"
>>>>> your
>>>>> >> code -- you just run your app.
>>>>> >>
>>>>> >> Thus, if your console consumer can connect to the cluster, your
>>>>> Streams
>>>>> >> app should also be able to connect to the cluster.
>>>>> >>
>>>>> >>
>>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>>> >> seems log to process just a few records). But you might need to put
>>>>> >> startup delay into account. I would recommend to register a shutdown
>>>>> >> hook: see
>>>>> >> https://github.com/confluentinc/examples/blob/3.
>>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>>> >> WordCountLambdaExample.java#L178-L181
>>>>> >>
>>>>> >>
>>>>> >> Hope this helps.
>>>>> >>
>>>>> >> -Matthias
>>>>> >>
>>>>> >>
>>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>>> >>> Hi Matthias,
>>>>> >>>
>>>>> >>> Thank you for the quick response, appreciate it!
>>>>> >>>
>>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
>>>>> some
>>>>> >> data
>>>>> >>> to wordCount-input using
>>>>> >>>
>>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>>> "{{.Names}}")
>>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>>> >>> wordCount-input
>>>>> >>>
>>>>> >>> test
>>>>> >>>
>>>>> >>> new
>>>>> >>>
>>>>> >>> word
>>>>> >>>
>>>>> >>> count
>>>>> >>>
>>>>> >>> wordcount
>>>>> >>>
>>>>> >>> word count
>>>>> >>>
>>>>> >>> So, when I check the number of messages in wordCount-input I see
>>>>> the same
>>>>> >>> messages. However, when I run below code I do not see any
>>>>> message/data in
>>>>> >>> wordCount-output.
>>>>> >>>
>>>>> >>> Can I connect to kafka in VM/docker container using below code or
>>>>> do I
>>>>> >> need
>>>>> >>> to change/add other parameters? How can I submit the code to
>>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>>> the
>>>>> >>> code(e.g. jar file)?
>>>>> >>>
>>>>> >>> I really appreciate your input as I am blocked and cannot run even
>>>>> below
>>>>> >>> simple example.
>>>>> >>>
>>>>> >>> Best regards,
>>>>> >>> Mina
>>>>> >>>
>>>>> >>> I changed the code to be as below:
>>>>> >>>
>>>>> >>> Properties props = new Properties();
>>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>> "wordCount-streaming");
>>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> "<ipAddress>:9092");
>>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> >>> Serdes.String().getClass().getName());
>>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> >>> Serdes.String().getClass().getName());
>>>>> >>>
>>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>>> >>>
>>>>> >>> // setting offset reset to earliest so that we can re-run the demo
>>>>> >>> code with the same pre-loaded data
>>>>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>>> >> Kafka+Streams+Application+Reset+Tool
>>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>> >>>
>>>>> >>> KStreamBuilder builder = new KStreamBuilder();
>>>>> >>>
>>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu
>>>>> t");
>>>>> >>>
>>>>> >>> KTable<String, Long> counts = source
>>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>>> >>>         @Override
>>>>> >>>         public Iterable<String> apply(String value) {
>>>>> >>>            return
>>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>>> >>>         }
>>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>>> >> String>>() {
>>>>> >>>         @Override
>>>>> >>>         public KeyValue<String, String> apply(String key, String
>>>>> value)
>>>>> >> {
>>>>> >>>            return new KeyValue<>(value, value);
>>>>> >>>         }
>>>>> >>>      })
>>>>> >>>      .groupByKey()
>>>>> >>>      .count("Counts");
>>>>> >>>
>>>>> >>> // need to override value serde to Long type
>>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>>> >>>
>>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>> >>> streams.start();
>>>>> >>>
>>>>> >>> // usually the stream application would be running forever,
>>>>> >>> // in this example we just let it run for some time and stop since
>>>>> the
>>>>> >>> input data is finite.
>>>>> >>> Thread.sleep(5000L);
>>>>> >>>
>>>>> >>> streams.close();
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Maybe you need to reset your application using the reset tool:
>>>>> >>>> http://docs.confluent.io/current/streams/developer-
>>>>> >>>> guide.html#application-reset-tool
>>>>> >>>>
>>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>>> might
>>>>> >>>> only see data on commit.
>>>>> >>>>
>>>>> >>>> Try to reduce commit interval or disable caching by setting
>>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> -Matthias
>>>>> >>>>
>>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>> >>>>> Hi,
>>>>> >>>>>
>>>>> >>>>> This is the first time that am using Kafka Stream. I would like
>>>>> to read
>>>>> >>>>> from input topic and write to output topic. However, I do not
>>>>> see the
>>>>> >>>> word
>>>>> >>>>> count when I try to run below example. Looks like that it does
>>>>> not
>>>>> >>>> connect
>>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
>>>>> kafka as
>>>>> >>>> well
>>>>> >>>>> as the container in a VM, same situation.
>>>>> >>>>>
>>>>> >>>>> There are over 200 message in the input kafka topic.
>>>>> >>>>>
>>>>> >>>>> Your input is appreciated!
>>>>> >>>>>
>>>>> >>>>> Best regards,
>>>>> >>>>> Mina
>>>>> >>>>>
>>>>> >>>>> import org.apache.kafka.common.serialization.*;
>>>>> >>>>> import org.apache.kafka.streams.*;
>>>>> >>>>> import org.apache.kafka.streams.kstream.*;
>>>>> >>>>>
>>>>> >>>>> import java.util.*;
>>>>> >>>>> import java.util.regex.*;
>>>>> >>>>>
>>>>> >>>>> public class WordCountExample {
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>   public static void main(String [] args)   {
>>>>> >>>>>      final Properties streamsConfiguration = new Properties();
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.APPLICATION_ID_CONFIG,
>>>>> >>>>> "wordcount-streaming");
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> >>>>> "<IPADDRESS>:9092");
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> >>>>> Serdes.String().getClass().getName());
>>>>> >>>>>      streamsConfiguration.put(Strea
>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> >>>>> Serdes.String().getClass().getName());
>>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>>> >> MS_CONFIG,
>>>>> >>>>> 10 * 1000);
>>>>> >>>>>
>>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>>> >>>>>
>>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>>> >>>>>
>>>>> >>>>>      final KStream<String, String> textLines =
>>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>> >>>>>
>>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>> >>>>>
>>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
>>>>> >>>>>            .flatMapValues(value ->
>>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>> >>>>>            .groupBy((key, word) -> word)
>>>>> >>>>>            .count("Counts")
>>>>> >>>>>            .toStream();
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>> >>>>>
>>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>>> >>>>> streamsConfiguration);
>>>>> >>>>>      streams.cleanUp();
>>>>> >>>>>      streams.start();
>>>>> >>>>>
>>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>>>>> >>>> Thread(streams::close));  }
>>>>> >>>>> }
>>>>> >>>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to