Hi Eno, Sorry! That is a typo!
I have a docker-machine with different containers (setup as directed @ http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html) docker ps --format "{{.Image}}: {{.Names}}" confluentinc/cp-kafka-connect:3.2.0: kafka-connect confluentinc/cp-enterprise-control-center:3.2.0: control-center confluentinc/cp-kafka-rest:3.2.0: kafka-rest confluentinc/cp-schema-registry:3.2.0: schema-registry confluentinc/cp-kafka:3.2.0: kafka confluentinc/cp-zookeeper:3.2.0: zookeeper I used example @ https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181 and followed the same steps. When I run below command in docker-machine, I see the messages in TextLinesTopic. docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer --from-beginning hello kafka streams all streams lead to kafka join kafka summit test1 test2 test3 test4 Running above command for WordsWithCountsTopic returns nothing*.* My program runs out of docker machine, and it does not return any error. I checked kafka logs and kafka-connect logs, no information is shown. Wondering what is the log level in kafka/kafka-connect. Best regards, Mina On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi there, > > I noticed in your example that you are using localhost:9092 to produce but > localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and the > Kafka Streams app all running within one docker container, or in different > containers? > > I just tested the WordCountLambdaExample and it works for me. This might > not have anything to do with streams, but rather with the Kafka > configuration and whether streams (that is just an app) can reach Kafka at > all. If you provide the above information we can look further. > > > > Thanks > Eno > > > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote: > > > > I reset and still not working! > > > > My env is setup using > > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html > > > > I just tried using > > https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > WordCountLambdaExample.java#L178-L181 > > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created > > from scratch as went through the steps as directed. > > > > When I stopped the java program and check the topics below are the data > in > > each topic. > > > > docker run \ > > > > --net=host \ > > > > --rm \ > > > > confluentinc/cp-kafka:3.2.0 \ > > > > kafka-console-consumer --bootstrap-server localhost:29092 --topic > > TextLinesTopic --new-consumer --from-beginning > > > > > > SHOWS > > > > hello kafka streams > > > > all streams lead to kafka > > > > join kafka summit > > > > test1 > > > > test2 > > > > test3 > > > > test4 > > > > FOR WordsWithCountsTopic nothing is shown > > > > > > I am new to the Kafka/Kafka Stream and still do not understand why a > simple > > example does not work! > > > > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >>>> So, when I check the number of messages in wordCount-input I see the > >> same > >>>> messages. However, when I run below code I do not see any message/data > >> in > >>>> wordCount-output. > >> > >> Did you reset your application? > >> > >> Each time you run you app and restart it, it will resume processing > >> where it left off. Thus, if something went wrong in you first run but > >> you got committed offsets, the app will not re-read the whole topic. > >> > >> You can check committed offset via bin/kafka-consumer-groups.sh. The > >> application-id from StreamConfig is used a group.id. > >> > >> Thus, resetting you app would be required to consumer the input topic > >> from scratch. Of you just write new data to you input topic. > >> > >>>> Can I connect to kafka in VM/docker container using below code or do I > >> need > >>>> to change/add other parameters? How can I submit the code to > >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the > >>>> code(e.g. jar file)? > >> > >> A Streams app is a regular Java application and can run anywhere -- > >> there is no notion of a processing cluster and you don't "submit" your > >> code -- you just run your app. > >> > >> Thus, if your console consumer can connect to the cluster, your Streams > >> app should also be able to connect to the cluster. > >> > >> > >> Maybe, the short runtime of 5 seconds could be a problem (even if it > >> seems log to process just a few records). But you might need to put > >> startup delay into account. I would recommend to register a shutdown > >> hook: see > >> https://github.com/confluentinc/examples/blob/3. > >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > >> WordCountLambdaExample.java#L178-L181 > >> > >> > >> Hope this helps. > >> > >> -Matthias > >> > >> > >> On 3/13/17 7:30 PM, Mina Aslani wrote: > >>> Hi Matthias, > >>> > >>> Thank you for the quick response, appreciate it! > >>> > >>> I created the topics wordCount-input and wordCount-output. Pushed some > >> data > >>> to wordCount-input using > >>> > >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}") > >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic > >>> wordCount-input > >>> > >>> test > >>> > >>> new > >>> > >>> word > >>> > >>> count > >>> > >>> wordcount > >>> > >>> word count > >>> > >>> So, when I check the number of messages in wordCount-input I see the > same > >>> messages. However, when I run below code I do not see any message/data > in > >>> wordCount-output. > >>> > >>> Can I connect to kafka in VM/docker container using below code or do I > >> need > >>> to change/add other parameters? How can I submit the code to > >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the > >>> code(e.g. jar file)? > >>> > >>> I really appreciate your input as I am blocked and cannot run even > below > >>> simple example. > >>> > >>> Best regards, > >>> Mina > >>> > >>> I changed the code to be as below: > >>> > >>> Properties props = new Properties(); > >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming"); > >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092"); > >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); > >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > >>> > >>> // setting offset reset to earliest so that we can re-run the demo > >>> code with the same pre-loaded data > >>> // Note: To re-run the demo, you need to use the offset reset tool: > >>> // https://cwiki.apache.org/confluence/display/KAFKA/ > >> Kafka+Streams+Application+Reset+Tool > >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > >>> > >>> KStreamBuilder builder = new KStreamBuilder(); > >>> > >>> KStream<String, String> source = builder.stream("wordCount-input"); > >>> > >>> KTable<String, Long> counts = source > >>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { > >>> @Override > >>> public Iterable<String> apply(String value) { > >>> return > >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); > >>> } > >>> }).map(new KeyValueMapper<String, String, KeyValue<String, > >> String>>() { > >>> @Override > >>> public KeyValue<String, String> apply(String key, String value) > >> { > >>> return new KeyValue<>(value, value); > >>> } > >>> }) > >>> .groupByKey() > >>> .count("Counts"); > >>> > >>> // need to override value serde to Long type > >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); > >>> > >>> KafkaStreams streams = new KafkaStreams(builder, props); > >>> streams.start(); > >>> > >>> // usually the stream application would be running forever, > >>> // in this example we just let it run for some time and stop since the > >>> input data is finite. > >>> Thread.sleep(5000L); > >>> > >>> streams.close(); > >>> > >>> > >>> > >>> > >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> Maybe you need to reset your application using the reset tool: > >>>> http://docs.confluent.io/current/streams/developer- > >>>> guide.html#application-reset-tool > >>>> > >>>> Also keep in mind, that KTables buffer internally, and thus, you might > >>>> only see data on commit. > >>>> > >>>> Try to reduce commit interval or disable caching by setting > >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: > >>>>> Hi, > >>>>> > >>>>> This is the first time that am using Kafka Stream. I would like to > read > >>>>> from input topic and write to output topic. However, I do not see the > >>>> word > >>>>> count when I try to run below example. Looks like that it does not > >>>> connect > >>>>> to Kafka. I do not see any error though. I tried my localhost kafka > as > >>>> well > >>>>> as the container in a VM, same situation. > >>>>> > >>>>> There are over 200 message in the input kafka topic. > >>>>> > >>>>> Your input is appreciated! > >>>>> > >>>>> Best regards, > >>>>> Mina > >>>>> > >>>>> import org.apache.kafka.common.serialization.*; > >>>>> import org.apache.kafka.streams.*; > >>>>> import org.apache.kafka.streams.kstream.*; > >>>>> > >>>>> import java.util.*; > >>>>> import java.util.regex.*; > >>>>> > >>>>> public class WordCountExample { > >>>>> > >>>>> > >>>>> public static void main(String [] args) { > >>>>> final Properties streamsConfiguration = new Properties(); > >>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>>>> "wordcount-streaming"); > >>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ > SERVERS_CONFIG, > >>>>> "<IPADDRESS>:9092"); > >>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>>>> Serdes.String().getClass().getName()); > >>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_ > CLASS_CONFIG, > >>>>> Serdes.String().getClass().getName()); > >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > >> MS_CONFIG, > >>>>> 10 * 1000); > >>>>> > >>>>> final Serde<String> stringSerde = Serdes.String(); > >>>>> final Serde<Long> longSerde = Serdes.Long(); > >>>>> > >>>>> final KStreamBuilder builder = new KStreamBuilder(); > >>>>> > >>>>> final KStream<String, String> textLines = > >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); > >>>>> > >>>>> final Pattern pattern = Pattern.compile("\\W+", > >>>>> Pattern.UNICODE_CHARACTER_CLASS); > >>>>> > >>>>> final KStream<String, Long> wordCounts = textLines > >>>>> .flatMapValues(value -> > >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) > >>>>> .groupBy((key, word) -> word) > >>>>> .count("Counts") > >>>>> .toStream(); > >>>>> > >>>>> > >>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); > >>>>> > >>>>> final KafkaStreams streams = new KafkaStreams(builder, > >>>>> streamsConfiguration); > >>>>> streams.cleanUp(); > >>>>> streams.start(); > >>>>> > >>>>> Runtime.getRuntime().addShutdownHook(new > >>>> Thread(streams::close)); } > >>>>> } > >>>>> > >>>> > >>>> > >>> > >> > >> > >