Hi, I forgot to add in my previous email 2 questions.
To setup my env, shall I use https://raw.githubusercontent.com/confluentinc/cp-docker-images/master/examples/kafka-single-node/docker-compose.yml instead or is there any other docker-compose.yml (version 2 or 3) which is suggested to setup env? How can I check "whether streams (that is just an app) can reach Kafka"? Regards, Mina On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> wrote: > Hi Eno, > > Sorry! That is a typo! > > I have a docker-machine with different containers (setup as directed @ > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html) > > docker ps --format "{{.Image}}: {{.Names}}" > > confluentinc/cp-kafka-connect:3.2.0: kafka-connect > > confluentinc/cp-enterprise-control-center:3.2.0: control-center > > confluentinc/cp-kafka-rest:3.2.0: kafka-rest > > confluentinc/cp-schema-registry:3.2.0: schema-registry > > confluentinc/cp-kafka:3.2.0: kafka > > confluentinc/cp-zookeeper:3.2.0: zookeeper > > I used example @ https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > WordCountLambdaExample.java#L178-L181 and followed the same steps. > > When I run below command in docker-machine, I see the messages in > TextLinesTopic. > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer > --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer > --from-beginning > > hello kafka streams > > all streams lead to kafka > > join kafka summit > > test1 > > test2 > > test3 > > test4 > > Running above command for WordsWithCountsTopic returns nothing*.* > > My program runs out of docker machine, and it does not return any error. > > I checked kafka logs and kafka-connect logs, no information is shown. > Wondering what is the log level in kafka/kafka-connect. > > > Best regards, > Mina > > > > > On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi there, >> >> I noticed in your example that you are using localhost:9092 to produce >> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and >> the Kafka Streams app all running within one docker container, or in >> different containers? >> >> I just tested the WordCountLambdaExample and it works for me. This might >> not have anything to do with streams, but rather with the Kafka >> configuration and whether streams (that is just an app) can reach Kafka at >> all. If you provide the above information we can look further. >> >> >> >> Thanks >> Eno >> >> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote: >> > >> > I reset and still not working! >> > >> > My env is setup using >> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html >> > >> > I just tried using >> > https://github.com/confluentinc/examples/blob/3.2.x/kafka- >> streams/src/main/java/io/confluent/examples/streams/Wor >> dCountLambdaExample.java#L178-L181 >> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) >> created >> > from scratch as went through the steps as directed. >> > >> > When I stopped the java program and check the topics below are the data >> in >> > each topic. >> > >> > docker run \ >> > >> > --net=host \ >> > >> > --rm \ >> > >> > confluentinc/cp-kafka:3.2.0 \ >> > >> > kafka-console-consumer --bootstrap-server localhost:29092 --topic >> > TextLinesTopic --new-consumer --from-beginning >> > >> > >> > SHOWS >> > >> > hello kafka streams >> > >> > all streams lead to kafka >> > >> > join kafka summit >> > >> > test1 >> > >> > test2 >> > >> > test3 >> > >> > test4 >> > >> > FOR WordsWithCountsTopic nothing is shown >> > >> > >> > I am new to the Kafka/Kafka Stream and still do not understand why a >> simple >> > example does not work! >> > >> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io >> > >> > wrote: >> > >> >>>> So, when I check the number of messages in wordCount-input I see the >> >> same >> >>>> messages. However, when I run below code I do not see any >> message/data >> >> in >> >>>> wordCount-output. >> >> >> >> Did you reset your application? >> >> >> >> Each time you run you app and restart it, it will resume processing >> >> where it left off. Thus, if something went wrong in you first run but >> >> you got committed offsets, the app will not re-read the whole topic. >> >> >> >> You can check committed offset via bin/kafka-consumer-groups.sh. The >> >> application-id from StreamConfig is used a group.id. >> >> >> >> Thus, resetting you app would be required to consumer the input topic >> >> from scratch. Of you just write new data to you input topic. >> >> >> >>>> Can I connect to kafka in VM/docker container using below code or do >> I >> >> need >> >>>> to change/add other parameters? How can I submit the code to >> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >> the >> >>>> code(e.g. jar file)? >> >> >> >> A Streams app is a regular Java application and can run anywhere -- >> >> there is no notion of a processing cluster and you don't "submit" your >> >> code -- you just run your app. >> >> >> >> Thus, if your console consumer can connect to the cluster, your Streams >> >> app should also be able to connect to the cluster. >> >> >> >> >> >> Maybe, the short runtime of 5 seconds could be a problem (even if it >> >> seems log to process just a few records). But you might need to put >> >> startup delay into account. I would recommend to register a shutdown >> >> hook: see >> >> https://github.com/confluentinc/examples/blob/3. >> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ >> >> WordCountLambdaExample.java#L178-L181 >> >> >> >> >> >> Hope this helps. >> >> >> >> -Matthias >> >> >> >> >> >> On 3/13/17 7:30 PM, Mina Aslani wrote: >> >>> Hi Matthias, >> >>> >> >>> Thank you for the quick response, appreciate it! >> >>> >> >>> I created the topics wordCount-input and wordCount-output. Pushed some >> >> data >> >>> to wordCount-input using >> >>> >> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}") >> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic >> >>> wordCount-input >> >>> >> >>> test >> >>> >> >>> new >> >>> >> >>> word >> >>> >> >>> count >> >>> >> >>> wordcount >> >>> >> >>> word count >> >>> >> >>> So, when I check the number of messages in wordCount-input I see the >> same >> >>> messages. However, when I run below code I do not see any >> message/data in >> >>> wordCount-output. >> >>> >> >>> Can I connect to kafka in VM/docker container using below code or do I >> >> need >> >>> to change/add other parameters? How can I submit the code to >> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the >> >>> code(e.g. jar file)? >> >>> >> >>> I really appreciate your input as I am blocked and cannot run even >> below >> >>> simple example. >> >>> >> >>> Best regards, >> >>> Mina >> >>> >> >>> I changed the code to be as below: >> >>> >> >>> Properties props = new Properties(); >> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, >> "wordCount-streaming"); >> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >> "<ipAddress>:9092"); >> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> >>> Serdes.String().getClass().getName()); >> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> >>> Serdes.String().getClass().getName()); >> >>> >> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); >> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >> >>> >> >>> // setting offset reset to earliest so that we can re-run the demo >> >>> code with the same pre-loaded data >> >>> // Note: To re-run the demo, you need to use the offset reset tool: >> >>> // https://cwiki.apache.org/confluence/display/KAFKA/ >> >> Kafka+Streams+Application+Reset+Tool >> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >> >>> >> >>> KStreamBuilder builder = new KStreamBuilder(); >> >>> >> >>> KStream<String, String> source = builder.stream("wordCount-input"); >> >>> >> >>> KTable<String, Long> counts = source >> >>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { >> >>> @Override >> >>> public Iterable<String> apply(String value) { >> >>> return >> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); >> >>> } >> >>> }).map(new KeyValueMapper<String, String, KeyValue<String, >> >> String>>() { >> >>> @Override >> >>> public KeyValue<String, String> apply(String key, String >> value) >> >> { >> >>> return new KeyValue<>(value, value); >> >>> } >> >>> }) >> >>> .groupByKey() >> >>> .count("Counts"); >> >>> >> >>> // need to override value serde to Long type >> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); >> >>> >> >>> KafkaStreams streams = new KafkaStreams(builder, props); >> >>> streams.start(); >> >>> >> >>> // usually the stream application would be running forever, >> >>> // in this example we just let it run for some time and stop since the >> >>> input data is finite. >> >>> Thread.sleep(5000L); >> >>> >> >>> streams.close(); >> >>> >> >>> >> >>> >> >>> >> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < >> matth...@confluent.io> >> >>> wrote: >> >>> >> >>>> Maybe you need to reset your application using the reset tool: >> >>>> http://docs.confluent.io/current/streams/developer- >> >>>> guide.html#application-reset-tool >> >>>> >> >>>> Also keep in mind, that KTables buffer internally, and thus, you >> might >> >>>> only see data on commit. >> >>>> >> >>>> Try to reduce commit interval or disable caching by setting >> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. >> >>>> >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: >> >>>>> Hi, >> >>>>> >> >>>>> This is the first time that am using Kafka Stream. I would like to >> read >> >>>>> from input topic and write to output topic. However, I do not see >> the >> >>>> word >> >>>>> count when I try to run below example. Looks like that it does not >> >>>> connect >> >>>>> to Kafka. I do not see any error though. I tried my localhost kafka >> as >> >>>> well >> >>>>> as the container in a VM, same situation. >> >>>>> >> >>>>> There are over 200 message in the input kafka topic. >> >>>>> >> >>>>> Your input is appreciated! >> >>>>> >> >>>>> Best regards, >> >>>>> Mina >> >>>>> >> >>>>> import org.apache.kafka.common.serialization.*; >> >>>>> import org.apache.kafka.streams.*; >> >>>>> import org.apache.kafka.streams.kstream.*; >> >>>>> >> >>>>> import java.util.*; >> >>>>> import java.util.regex.*; >> >>>>> >> >>>>> public class WordCountExample { >> >>>>> >> >>>>> >> >>>>> public static void main(String [] args) { >> >>>>> final Properties streamsConfiguration = new Properties(); >> >>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >> >>>>> "wordcount-streaming"); >> >>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_ >> CONFIG, >> >>>>> "<IPADDRESS>:9092"); >> >>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> >>>>> Serdes.String().getClass().getName()); >> >>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_ >> CONFIG, >> >>>>> Serdes.String().getClass().getName()); >> >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >> >> MS_CONFIG, >> >>>>> 10 * 1000); >> >>>>> >> >>>>> final Serde<String> stringSerde = Serdes.String(); >> >>>>> final Serde<Long> longSerde = Serdes.Long(); >> >>>>> >> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >> >>>>> >> >>>>> final KStream<String, String> textLines = >> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); >> >>>>> >> >>>>> final Pattern pattern = Pattern.compile("\\W+", >> >>>>> Pattern.UNICODE_CHARACTER_CLASS); >> >>>>> >> >>>>> final KStream<String, Long> wordCounts = textLines >> >>>>> .flatMapValues(value -> >> >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) >> >>>>> .groupBy((key, word) -> word) >> >>>>> .count("Counts") >> >>>>> .toStream(); >> >>>>> >> >>>>> >> >>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); >> >>>>> >> >>>>> final KafkaStreams streams = new KafkaStreams(builder, >> >>>>> streamsConfiguration); >> >>>>> streams.cleanUp(); >> >>>>> streams.start(); >> >>>>> >> >>>>> Runtime.getRuntime().addShutdownHook(new >> >>>> Thread(streams::close)); } >> >>>>> } >> >>>>> >> >>>> >> >>>> >> >>> >> >> >> >> >> >> >