And the port for kafka is 29092 and for zookeeper 32181. On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> wrote:
> Hi, > > I forgot to add in my previous email 2 questions. > > To setup my env, shall I use https://raw.githubusercontent.com/ > confluentinc/cp-docker-images/master/examples/kafka-single- > node/docker-compose.yml instead or is there any other docker-compose.yml > (version 2 or 3) which is suggested to setup env? > > How can I check "whether streams (that is just an app) can reach Kafka"? > > Regards, > Mina > > On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> wrote: > >> Hi Eno, >> >> Sorry! That is a typo! >> >> I have a docker-machine with different containers (setup as directed @ >> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html) >> >> docker ps --format "{{.Image}}: {{.Names}}" >> >> confluentinc/cp-kafka-connect:3.2.0: kafka-connect >> >> confluentinc/cp-enterprise-control-center:3.2.0: control-center >> >> confluentinc/cp-kafka-rest:3.2.0: kafka-rest >> >> confluentinc/cp-schema-registry:3.2.0: schema-registry >> >> confluentinc/cp-kafka:3.2.0: kafka >> >> confluentinc/cp-zookeeper:3.2.0: zookeeper >> >> I used example @ https://github.com/confluent >> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/ >> confluent/examples/streams/WordCountLambdaExample.java#L178-L181 and >> followed the same steps. >> >> When I run below command in docker-machine, I see the messages in >> TextLinesTopic. >> >> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer >> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer >> --from-beginning >> >> hello kafka streams >> >> all streams lead to kafka >> >> join kafka summit >> >> test1 >> >> test2 >> >> test3 >> >> test4 >> >> Running above command for WordsWithCountsTopic returns nothing*.* >> >> My program runs out of docker machine, and it does not return any error. >> >> I checked kafka logs and kafka-connect logs, no information is shown. >> Wondering what is the log level in kafka/kafka-connect. >> >> >> Best regards, >> Mina >> >> >> >> >> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> >>> Hi there, >>> >>> I noticed in your example that you are using localhost:9092 to produce >>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and >>> the Kafka Streams app all running within one docker container, or in >>> different containers? >>> >>> I just tested the WordCountLambdaExample and it works for me. This might >>> not have anything to do with streams, but rather with the Kafka >>> configuration and whether streams (that is just an app) can reach Kafka at >>> all. If you provide the above information we can look further. >>> >>> >>> >>> Thanks >>> Eno >>> >>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote: >>> > >>> > I reset and still not working! >>> > >>> > My env is setup using >>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html >>> > >>> > I just tried using >>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st >>> reams/src/main/java/io/confluent/examples/streams/WordCountL >>> ambdaExample.java#L178-L181 >>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) >>> created >>> > from scratch as went through the steps as directed. >>> > >>> > When I stopped the java program and check the topics below are the >>> data in >>> > each topic. >>> > >>> > docker run \ >>> > >>> > --net=host \ >>> > >>> > --rm \ >>> > >>> > confluentinc/cp-kafka:3.2.0 \ >>> > >>> > kafka-console-consumer --bootstrap-server localhost:29092 --topic >>> > TextLinesTopic --new-consumer --from-beginning >>> > >>> > >>> > SHOWS >>> > >>> > hello kafka streams >>> > >>> > all streams lead to kafka >>> > >>> > join kafka summit >>> > >>> > test1 >>> > >>> > test2 >>> > >>> > test3 >>> > >>> > test4 >>> > >>> > FOR WordsWithCountsTopic nothing is shown >>> > >>> > >>> > I am new to the Kafka/Kafka Stream and still do not understand why a >>> simple >>> > example does not work! >>> > >>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax < >>> matth...@confluent.io> >>> > wrote: >>> > >>> >>>> So, when I check the number of messages in wordCount-input I see the >>> >> same >>> >>>> messages. However, when I run below code I do not see any >>> message/data >>> >> in >>> >>>> wordCount-output. >>> >> >>> >> Did you reset your application? >>> >> >>> >> Each time you run you app and restart it, it will resume processing >>> >> where it left off. Thus, if something went wrong in you first run but >>> >> you got committed offsets, the app will not re-read the whole topic. >>> >> >>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The >>> >> application-id from StreamConfig is used a group.id. >>> >> >>> >> Thus, resetting you app would be required to consumer the input topic >>> >> from scratch. Of you just write new data to you input topic. >>> >> >>> >>>> Can I connect to kafka in VM/docker container using below code or >>> do I >>> >> need >>> >>>> to change/add other parameters? How can I submit the code to >>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >>> the >>> >>>> code(e.g. jar file)? >>> >> >>> >> A Streams app is a regular Java application and can run anywhere -- >>> >> there is no notion of a processing cluster and you don't "submit" your >>> >> code -- you just run your app. >>> >> >>> >> Thus, if your console consumer can connect to the cluster, your >>> Streams >>> >> app should also be able to connect to the cluster. >>> >> >>> >> >>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it >>> >> seems log to process just a few records). But you might need to put >>> >> startup delay into account. I would recommend to register a shutdown >>> >> hook: see >>> >> https://github.com/confluentinc/examples/blob/3. >>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ >>> >> WordCountLambdaExample.java#L178-L181 >>> >> >>> >> >>> >> Hope this helps. >>> >> >>> >> -Matthias >>> >> >>> >> >>> >> On 3/13/17 7:30 PM, Mina Aslani wrote: >>> >>> Hi Matthias, >>> >>> >>> >>> Thank you for the quick response, appreciate it! >>> >>> >>> >>> I created the topics wordCount-input and wordCount-output. Pushed >>> some >>> >> data >>> >>> to wordCount-input using >>> >>> >>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}") >>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic >>> >>> wordCount-input >>> >>> >>> >>> test >>> >>> >>> >>> new >>> >>> >>> >>> word >>> >>> >>> >>> count >>> >>> >>> >>> wordcount >>> >>> >>> >>> word count >>> >>> >>> >>> So, when I check the number of messages in wordCount-input I see the >>> same >>> >>> messages. However, when I run below code I do not see any >>> message/data in >>> >>> wordCount-output. >>> >>> >>> >>> Can I connect to kafka in VM/docker container using below code or do >>> I >>> >> need >>> >>> to change/add other parameters? How can I submit the code to >>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit >>> the >>> >>> code(e.g. jar file)? >>> >>> >>> >>> I really appreciate your input as I am blocked and cannot run even >>> below >>> >>> simple example. >>> >>> >>> >>> Best regards, >>> >>> Mina >>> >>> >>> >>> I changed the code to be as below: >>> >>> >>> >>> Properties props = new Properties(); >>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, >>> "wordCount-streaming"); >>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> "<ipAddress>:9092"); >>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> >>> Serdes.String().getClass().getName()); >>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> >>> Serdes.String().getClass().getName()); >>> >>> >>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); >>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >>> >>> >>> >>> // setting offset reset to earliest so that we can re-run the demo >>> >>> code with the same pre-loaded data >>> >>> // Note: To re-run the demo, you need to use the offset reset tool: >>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/ >>> >> Kafka+Streams+Application+Reset+Tool >>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>> >>> >>> >>> KStreamBuilder builder = new KStreamBuilder(); >>> >>> >>> >>> KStream<String, String> source = builder.stream("wordCount-input"); >>> >>> >>> >>> KTable<String, Long> counts = source >>> >>> .flatMapValues(new ValueMapper<String, Iterable<String>>() { >>> >>> @Override >>> >>> public Iterable<String> apply(String value) { >>> >>> return >>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); >>> >>> } >>> >>> }).map(new KeyValueMapper<String, String, KeyValue<String, >>> >> String>>() { >>> >>> @Override >>> >>> public KeyValue<String, String> apply(String key, String >>> value) >>> >> { >>> >>> return new KeyValue<>(value, value); >>> >>> } >>> >>> }) >>> >>> .groupByKey() >>> >>> .count("Counts"); >>> >>> >>> >>> // need to override value serde to Long type >>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); >>> >>> >>> >>> KafkaStreams streams = new KafkaStreams(builder, props); >>> >>> streams.start(); >>> >>> >>> >>> // usually the stream application would be running forever, >>> >>> // in this example we just let it run for some time and stop since >>> the >>> >>> input data is finite. >>> >>> Thread.sleep(5000L); >>> >>> >>> >>> streams.close(); >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < >>> matth...@confluent.io> >>> >>> wrote: >>> >>> >>> >>>> Maybe you need to reset your application using the reset tool: >>> >>>> http://docs.confluent.io/current/streams/developer- >>> >>>> guide.html#application-reset-tool >>> >>>> >>> >>>> Also keep in mind, that KTables buffer internally, and thus, you >>> might >>> >>>> only see data on commit. >>> >>>> >>> >>>> Try to reduce commit interval or disable caching by setting >>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. >>> >>>> >>> >>>> >>> >>>> -Matthias >>> >>>> >>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: >>> >>>>> Hi, >>> >>>>> >>> >>>>> This is the first time that am using Kafka Stream. I would like to >>> read >>> >>>>> from input topic and write to output topic. However, I do not see >>> the >>> >>>> word >>> >>>>> count when I try to run below example. Looks like that it does not >>> >>>> connect >>> >>>>> to Kafka. I do not see any error though. I tried my localhost >>> kafka as >>> >>>> well >>> >>>>> as the container in a VM, same situation. >>> >>>>> >>> >>>>> There are over 200 message in the input kafka topic. >>> >>>>> >>> >>>>> Your input is appreciated! >>> >>>>> >>> >>>>> Best regards, >>> >>>>> Mina >>> >>>>> >>> >>>>> import org.apache.kafka.common.serialization.*; >>> >>>>> import org.apache.kafka.streams.*; >>> >>>>> import org.apache.kafka.streams.kstream.*; >>> >>>>> >>> >>>>> import java.util.*; >>> >>>>> import java.util.regex.*; >>> >>>>> >>> >>>>> public class WordCountExample { >>> >>>>> >>> >>>>> >>> >>>>> public static void main(String [] args) { >>> >>>>> final Properties streamsConfiguration = new Properties(); >>> >>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >>> >>>>> "wordcount-streaming"); >>> >>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CON >>> FIG, >>> >>>>> "<IPADDRESS>:9092"); >>> >>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFI >>> G, >>> >>>>> Serdes.String().getClass().getName()); >>> >>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CON >>> FIG, >>> >>>>> Serdes.String().getClass().getName()); >>> >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >>> >> MS_CONFIG, >>> >>>>> 10 * 1000); >>> >>>>> >>> >>>>> final Serde<String> stringSerde = Serdes.String(); >>> >>>>> final Serde<Long> longSerde = Serdes.Long(); >>> >>>>> >>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>> >>>>> >>> >>>>> final KStream<String, String> textLines = >>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); >>> >>>>> >>> >>>>> final Pattern pattern = Pattern.compile("\\W+", >>> >>>>> Pattern.UNICODE_CHARACTER_CLASS); >>> >>>>> >>> >>>>> final KStream<String, Long> wordCounts = textLines >>> >>>>> .flatMapValues(value -> >>> >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) >>> >>>>> .groupBy((key, word) -> word) >>> >>>>> .count("Counts") >>> >>>>> .toStream(); >>> >>>>> >>> >>>>> >>> >>>>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); >>> >>>>> >>> >>>>> final KafkaStreams streams = new KafkaStreams(builder, >>> >>>>> streamsConfiguration); >>> >>>>> streams.cleanUp(); >>> >>>>> streams.start(); >>> >>>>> >>> >>>>> Runtime.getRuntime().addShutdownHook(new >>> >>>> Thread(streams::close)); } >>> >>>>> } >>> >>>>> >>> >>>> >>> >>>> >>> >>> >>> >> >>> >> >>> >>> >> >