I even tried
http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-of-this-quickstart

and in docker-machine  ran /usr/bin/kafka-run-class
org.apache.kafka.streams.examples.wordcount.WordCountDemo

Running

docker run --net=host --rm confluentinc/cp-kafka:3.2.0
kafka-console-consumer --bootstrap-server localhost:9092 --topic
streams-wordcount-output --new-consumer --from-beginning

shows 8 blank messages

Is there any setting/configuration should be done as running the class in
the docker-machine and running program outside the docker-machine does not
return expected result!

On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> And the port for kafka is 29092 and for zookeeper 32181.
>
> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> wrote:
>
>> Hi,
>>
>> I forgot to add in my previous email 2 questions.
>>
>> To setup my env, shall I use https://raw.githubusercont
>> ent.com/confluentinc/cp-docker-images/master/examples/
>> kafka-single-node/docker-compose.yml instead or is there any other
>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>
>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>
>> Regards,
>> Mina
>>
>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com>
>> wrote:
>>
>>> Hi Eno,
>>>
>>> Sorry! That is a typo!
>>>
>>> I have a docker-machine with different containers (setup as directed @
>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>
>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>
>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>
>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>
>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>
>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>
>>> confluentinc/cp-kafka:3.2.0: kafka
>>>
>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>
>>> I used example @ https://github.com/confluent
>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>> followed the same steps.
>>>
>>> When I run below command in docker-machine, I see the messages in
>>> TextLinesTopic.
>>>
>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 
>>> kafka-console-consumer
>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>> --from-beginning
>>>
>>> hello kafka streams
>>>
>>> all streams lead to kafka
>>>
>>> join kafka summit
>>>
>>> test1
>>>
>>> test2
>>>
>>> test3
>>>
>>> test4
>>>
>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>
>>> My program runs out of docker machine, and it does not return any error.
>>>
>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>> Wondering what is the log level in kafka/kafka-connect.
>>>
>>>
>>> Best regards,
>>> Mina
>>>
>>>
>>>
>>>
>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> I noticed in your example that you are using localhost:9092 to produce
>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>>>> the Kafka Streams app all running within one docker container, or in
>>>> different containers?
>>>>
>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>> might not have anything to do with streams, but rather with the Kafka
>>>> configuration and whether streams (that is just an app) can reach Kafka at
>>>> all. If you provide the above information we can look further.
>>>>
>>>>
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
>>>> >
>>>> > I reset and still not working!
>>>> >
>>>> > My env is setup using
>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>> >
>>>> > I just tried using
>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>> ambdaExample.java#L178-L181
>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>> created
>>>> > from scratch as went through the steps as directed.
>>>> >
>>>> > When I stopped the java program and check the topics below are the
>>>> data in
>>>> > each topic.
>>>> >
>>>> > docker run \
>>>> >
>>>> >  --net=host \
>>>> >
>>>> >  --rm \
>>>> >
>>>> >  confluentinc/cp-kafka:3.2.0 \
>>>> >
>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>> > TextLinesTopic --new-consumer --from-beginning
>>>> >
>>>> >
>>>> > SHOWS
>>>> >
>>>> > hello kafka streams
>>>> >
>>>> > all streams lead to kafka
>>>> >
>>>> > join kafka summit
>>>> >
>>>> > test1
>>>> >
>>>> > test2
>>>> >
>>>> > test3
>>>> >
>>>> > test4
>>>> >
>>>> > FOR WordsWithCountsTopic nothing is shown
>>>> >
>>>> >
>>>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>>>> simple
>>>> > example does not work!
>>>> >
>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>> > wrote:
>>>> >
>>>> >>>> So, when I check the number of messages in wordCount-input I see
>>>> the
>>>> >> same
>>>> >>>> messages. However, when I run below code I do not see any
>>>> message/data
>>>> >> in
>>>> >>>> wordCount-output.
>>>> >>
>>>> >> Did you reset your application?
>>>> >>
>>>> >> Each time you run you app and restart it, it will resume processing
>>>> >> where it left off. Thus, if something went wrong in you first run but
>>>> >> you got committed offsets, the app will not re-read the whole topic.
>>>> >>
>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>> >> application-id from StreamConfig is used a group.id.
>>>> >>
>>>> >> Thus, resetting you app would be required to consumer the input topic
>>>> >> from scratch. Of you just write new data to you input topic.
>>>> >>
>>>> >>>> Can I connect to kafka in VM/docker container using below code or
>>>> do I
>>>> >> need
>>>> >>>> to change/add other parameters? How can I submit the code to
>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>> the
>>>> >>>> code(e.g. jar file)?
>>>> >>
>>>> >> A Streams app is a regular Java application and can run anywhere --
>>>> >> there is no notion of a processing cluster and you don't "submit"
>>>> your
>>>> >> code -- you just run your app.
>>>> >>
>>>> >> Thus, if your console consumer can connect to the cluster, your
>>>> Streams
>>>> >> app should also be able to connect to the cluster.
>>>> >>
>>>> >>
>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>> >> seems log to process just a few records). But you might need to put
>>>> >> startup delay into account. I would recommend to register a shutdown
>>>> >> hook: see
>>>> >> https://github.com/confluentinc/examples/blob/3.
>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>> >> WordCountLambdaExample.java#L178-L181
>>>> >>
>>>> >>
>>>> >> Hope this helps.
>>>> >>
>>>> >> -Matthias
>>>> >>
>>>> >>
>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>> >>> Hi Matthias,
>>>> >>>
>>>> >>> Thank you for the quick response, appreciate it!
>>>> >>>
>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
>>>> some
>>>> >> data
>>>> >>> to wordCount-input using
>>>> >>>
>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>> "{{.Names}}")
>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>> >>> wordCount-input
>>>> >>>
>>>> >>> test
>>>> >>>
>>>> >>> new
>>>> >>>
>>>> >>> word
>>>> >>>
>>>> >>> count
>>>> >>>
>>>> >>> wordcount
>>>> >>>
>>>> >>> word count
>>>> >>>
>>>> >>> So, when I check the number of messages in wordCount-input I see
>>>> the same
>>>> >>> messages. However, when I run below code I do not see any
>>>> message/data in
>>>> >>> wordCount-output.
>>>> >>>
>>>> >>> Can I connect to kafka in VM/docker container using below code or
>>>> do I
>>>> >> need
>>>> >>> to change/add other parameters? How can I submit the code to
>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>> the
>>>> >>> code(e.g. jar file)?
>>>> >>>
>>>> >>> I really appreciate your input as I am blocked and cannot run even
>>>> below
>>>> >>> simple example.
>>>> >>>
>>>> >>> Best regards,
>>>> >>> Mina
>>>> >>>
>>>> >>> I changed the code to be as below:
>>>> >>>
>>>> >>> Properties props = new Properties();
>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>> "wordCount-streaming");
>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>> "<ipAddress>:9092");
>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>> >>> Serdes.String().getClass().getName());
>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>> >>> Serdes.String().getClass().getName());
>>>> >>>
>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>> >>>
>>>> >>> // setting offset reset to earliest so that we can re-run the demo
>>>> >>> code with the same pre-loaded data
>>>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>> >> Kafka+Streams+Application+Reset+Tool
>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>> >>>
>>>> >>> KStreamBuilder builder = new KStreamBuilder();
>>>> >>>
>>>> >>> KStream<String, String> source = builder.stream("wordCount-input");
>>>> >>>
>>>> >>> KTable<String, Long> counts = source
>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>> >>>         @Override
>>>> >>>         public Iterable<String> apply(String value) {
>>>> >>>            return
>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>> >>>         }
>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>> >> String>>() {
>>>> >>>         @Override
>>>> >>>         public KeyValue<String, String> apply(String key, String
>>>> value)
>>>> >> {
>>>> >>>            return new KeyValue<>(value, value);
>>>> >>>         }
>>>> >>>      })
>>>> >>>      .groupByKey()
>>>> >>>      .count("Counts");
>>>> >>>
>>>> >>> // need to override value serde to Long type
>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>> >>>
>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>> >>> streams.start();
>>>> >>>
>>>> >>> // usually the stream application would be running forever,
>>>> >>> // in this example we just let it run for some time and stop since
>>>> the
>>>> >>> input data is finite.
>>>> >>> Thread.sleep(5000L);
>>>> >>>
>>>> >>> streams.close();
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Maybe you need to reset your application using the reset tool:
>>>> >>>> http://docs.confluent.io/current/streams/developer-
>>>> >>>> guide.html#application-reset-tool
>>>> >>>>
>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>> might
>>>> >>>> only see data on commit.
>>>> >>>>
>>>> >>>> Try to reduce commit interval or disable caching by setting
>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>> >>>>
>>>> >>>>
>>>> >>>> -Matthias
>>>> >>>>
>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>> >>>>> Hi,
>>>> >>>>>
>>>> >>>>> This is the first time that am using Kafka Stream. I would like
>>>> to read
>>>> >>>>> from input topic and write to output topic. However, I do not see
>>>> the
>>>> >>>> word
>>>> >>>>> count when I try to run below example. Looks like that it does not
>>>> >>>> connect
>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
>>>> kafka as
>>>> >>>> well
>>>> >>>>> as the container in a VM, same situation.
>>>> >>>>>
>>>> >>>>> There are over 200 message in the input kafka topic.
>>>> >>>>>
>>>> >>>>> Your input is appreciated!
>>>> >>>>>
>>>> >>>>> Best regards,
>>>> >>>>> Mina
>>>> >>>>>
>>>> >>>>> import org.apache.kafka.common.serialization.*;
>>>> >>>>> import org.apache.kafka.streams.*;
>>>> >>>>> import org.apache.kafka.streams.kstream.*;
>>>> >>>>>
>>>> >>>>> import java.util.*;
>>>> >>>>> import java.util.regex.*;
>>>> >>>>>
>>>> >>>>> public class WordCountExample {
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>   public static void main(String [] args)   {
>>>> >>>>>      final Properties streamsConfiguration = new Properties();
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG
>>>> ,
>>>> >>>>> "wordcount-streaming");
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CON
>>>> FIG,
>>>> >>>>> "<IPADDRESS>:9092");
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFI
>>>> G,
>>>> >>>>> Serdes.String().getClass().getName());
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CON
>>>> FIG,
>>>> >>>>> Serdes.String().getClass().getName());
>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>> >> MS_CONFIG,
>>>> >>>>> 10 * 1000);
>>>> >>>>>
>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>> >>>>>
>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>> >>>>>
>>>> >>>>>      final KStream<String, String> textLines =
>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>> >>>>>
>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>> >>>>>
>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
>>>> >>>>>            .flatMapValues(value ->
>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>> >>>>>            .groupBy((key, word) -> word)
>>>> >>>>>            .count("Counts")
>>>> >>>>>            .toStream();
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>> >>>>>
>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>> >>>>> streamsConfiguration);
>>>> >>>>>      streams.cleanUp();
>>>> >>>>>      streams.start();
>>>> >>>>>
>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>>>> >>>> Thread(streams::close));  }
>>>> >>>>> }
>>>> >>>>>
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >>
>>>>
>>>>
>>>
>>
>

Reply via email to