Hi Eno,

Sorry! That is a typo!

I have a docker-machine with different containers (setup as directed @
http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)

docker ps --format "{{.Image}}: {{.Names}}"

confluentinc/cp-kafka-connect:3.2.0: kafka-connect

confluentinc/cp-enterprise-control-center:3.2.0: control-center

confluentinc/cp-kafka-rest:3.2.0: kafka-rest

confluentinc/cp-schema-registry:3.2.0: schema-registry

confluentinc/cp-kafka:3.2.0: kafka

confluentinc/cp-zookeeper:3.2.0: zookeeper

I used example @
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
and followed the same steps.

When I run below command in docker-machine, I see the messages in
TextLinesTopic.

docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
--bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
--from-beginning

hello kafka streams

all streams lead to kafka

join kafka summit

test1

test2

test3

test4

Running above command for WordsWithCountsTopic returns nothing*.*

My program runs out of docker machine, and it does not return any error.

I checked kafka logs and kafka-connect logs, no information is shown.
Wondering what is the log level in kafka/kafka-connect.


Best regards,
Mina




On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi there,
>
> I noticed in your example that you are using localhost:9092 to produce but
> localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and the
> Kafka Streams app all running within one docker container, or in different
> containers?
>
> I just tested the WordCountLambdaExample and it works for me. This might
> not have anything to do with streams, but rather with the Kafka
> configuration and whether streams (that is just an app) can reach Kafka at
> all. If you provide the above information we can look further.
>
>
>
> Thanks
> Eno
>
> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
> >
> > I reset and still not working!
> >
> > My env is setup using
> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> >
> > I just tried using
> > https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181
> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
> > from scratch as went through the steps as directed.
> >
> > When I stopped the java program and check the topics below are the data
> in
> > each topic.
> >
> > docker run \
> >
> >  --net=host \
> >
> >  --rm \
> >
> >  confluentinc/cp-kafka:3.2.0 \
> >
> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> > TextLinesTopic --new-consumer --from-beginning
> >
> >
> > SHOWS
> >
> > hello kafka streams
> >
> > all streams lead to kafka
> >
> > join kafka summit
> >
> > test1
> >
> > test2
> >
> > test3
> >
> > test4
> >
> > FOR WordsWithCountsTopic nothing is shown
> >
> >
> > I am new to the Kafka/Kafka Stream and still do not understand why a
> simple
> > example does not work!
> >
> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >>>> So, when I check the number of messages in wordCount-input I see the
> >> same
> >>>> messages. However, when I run below code I do not see any message/data
> >> in
> >>>> wordCount-output.
> >>
> >> Did you reset your application?
> >>
> >> Each time you run you app and restart it, it will resume processing
> >> where it left off. Thus, if something went wrong in you first run but
> >> you got committed offsets, the app will not re-read the whole topic.
> >>
> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
> >> application-id from StreamConfig is used a group.id.
> >>
> >> Thus, resetting you app would be required to consumer the input topic
> >> from scratch. Of you just write new data to you input topic.
> >>
> >>>> Can I connect to kafka in VM/docker container using below code or do I
> >> need
> >>>> to change/add other parameters? How can I submit the code to
> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >>>> code(e.g. jar file)?
> >>
> >> A Streams app is a regular Java application and can run anywhere --
> >> there is no notion of a processing cluster and you don't "submit" your
> >> code -- you just run your app.
> >>
> >> Thus, if your console consumer can connect to the cluster, your Streams
> >> app should also be able to connect to the cluster.
> >>
> >>
> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
> >> seems log to process just a few records). But you might need to put
> >> startup delay into account. I would recommend to register a shutdown
> >> hook: see
> >> https://github.com/confluentinc/examples/blob/3.
> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >> WordCountLambdaExample.java#L178-L181
> >>
> >>
> >> Hope this helps.
> >>
> >> -Matthias
> >>
> >>
> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>> Hi Matthias,
> >>>
> >>> Thank you for the quick response, appreciate it!
> >>>
> >>> I created the topics wordCount-input and wordCount-output. Pushed some
> >> data
> >>> to wordCount-input using
> >>>
> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>> wordCount-input
> >>>
> >>> test
> >>>
> >>> new
> >>>
> >>> word
> >>>
> >>> count
> >>>
> >>> wordcount
> >>>
> >>> word count
> >>>
> >>> So, when I check the number of messages in wordCount-input I see the
> same
> >>> messages. However, when I run below code I do not see any message/data
> in
> >>> wordCount-output.
> >>>
> >>> Can I connect to kafka in VM/docker container using below code or do I
> >> need
> >>> to change/add other parameters? How can I submit the code to
> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >>> code(e.g. jar file)?
> >>>
> >>> I really appreciate your input as I am blocked and cannot run even
> below
> >>> simple example.
> >>>
> >>> Best regards,
> >>> Mina
> >>>
> >>> I changed the code to be as below:
> >>>
> >>> Properties props = new Properties();
> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>
> >>> // setting offset reset to earliest so that we can re-run the demo
> >>> code with the same pre-loaded data
> >>> // Note: To re-run the demo, you need to use the offset reset tool:
> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Application+Reset+Tool
> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>
> >>> KStreamBuilder builder = new KStreamBuilder();
> >>>
> >>> KStream<String, String> source = builder.stream("wordCount-input");
> >>>
> >>> KTable<String, Long> counts = source
> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>         @Override
> >>>         public Iterable<String> apply(String value) {
> >>>            return
> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
> >>>         }
> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
> >> String>>() {
> >>>         @Override
> >>>         public KeyValue<String, String> apply(String key, String value)
> >> {
> >>>            return new KeyValue<>(value, value);
> >>>         }
> >>>      })
> >>>      .groupByKey()
> >>>      .count("Counts");
> >>>
> >>> // need to override value serde to Long type
> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>
> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>> streams.start();
> >>>
> >>> // usually the stream application would be running forever,
> >>> // in this example we just let it run for some time and stop since the
> >>> input data is finite.
> >>> Thread.sleep(5000L);
> >>>
> >>> streams.close();
> >>>
> >>>
> >>>
> >>>
> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Maybe you need to reset your application using the reset tool:
> >>>> http://docs.confluent.io/current/streams/developer-
> >>>> guide.html#application-reset-tool
> >>>>
> >>>> Also keep in mind, that KTables buffer internally, and thus, you might
> >>>> only see data on commit.
> >>>>
> >>>> Try to reduce commit interval or disable caching by setting
> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>> Hi,
> >>>>>
> >>>>> This is the first time that am using Kafka Stream. I would like to
> read
> >>>>> from input topic and write to output topic. However, I do not see the
> >>>> word
> >>>>> count when I try to run below example. Looks like that it does not
> >>>> connect
> >>>>> to Kafka. I do not see any error though. I tried my localhost kafka
> as
> >>>> well
> >>>>> as the container in a VM, same situation.
> >>>>>
> >>>>> There are over 200 message in the input kafka topic.
> >>>>>
> >>>>> Your input is appreciated!
> >>>>>
> >>>>> Best regards,
> >>>>> Mina
> >>>>>
> >>>>> import org.apache.kafka.common.serialization.*;
> >>>>> import org.apache.kafka.streams.*;
> >>>>> import org.apache.kafka.streams.kstream.*;
> >>>>>
> >>>>> import java.util.*;
> >>>>> import java.util.regex.*;
> >>>>>
> >>>>> public class WordCountExample {
> >>>>>
> >>>>>
> >>>>>   public static void main(String [] args)   {
> >>>>>      final Properties streamsConfiguration = new Properties();
> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>> "wordcount-streaming");
> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> SERVERS_CONFIG,
> >>>>> "<IPADDRESS>:9092");
> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> Serdes.String().getClass().getName());
> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_
> CLASS_CONFIG,
> >>>>> Serdes.String().getClass().getName());
> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >> MS_CONFIG,
> >>>>> 10 * 1000);
> >>>>>
> >>>>>      final Serde<String> stringSerde = Serdes.String();
> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> >>>>>
> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> >>>>>
> >>>>>      final KStream<String, String> textLines =
> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>>
> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>>
> >>>>>      final KStream<String, Long> wordCounts = textLines
> >>>>>            .flatMapValues(value ->
> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>>            .groupBy((key, word) -> word)
> >>>>>            .count("Counts")
> >>>>>            .toStream();
> >>>>>
> >>>>>
> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>>
> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> >>>>> streamsConfiguration);
> >>>>>      streams.cleanUp();
> >>>>>      streams.start();
> >>>>>
> >>>>>      Runtime.getRuntime().addShutdownHook(new
> >>>> Thread(streams::close));  }
> >>>>> }
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
>
>

Reply via email to