Hi,

I forgot to add in my previous email 2 questions.

To setup my env, shall I use
https://raw.githubusercontent.com/confluentinc/cp-docker-images/master/examples/kafka-single-node/docker-compose.yml
instead or is there any other docker-compose.yml (version 2 or 3) which is
suggested to setup env?

How can I check "whether streams (that is just an app) can reach Kafka"?

Regards,
Mina

On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> Hi Eno,
>
> Sorry! That is a typo!
>
> I have a docker-machine with different containers (setup as directed @
> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>
> docker ps --format "{{.Image}}: {{.Names}}"
>
> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>
> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>
> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>
> confluentinc/cp-schema-registry:3.2.0: schema-registry
>
> confluentinc/cp-kafka:3.2.0: kafka
>
> confluentinc/cp-zookeeper:3.2.0: zookeeper
>
> I used example @ https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181 and followed the same steps.
>
> When I run below command in docker-machine, I see the messages in
> TextLinesTopic.
>
> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 kafka-console-consumer
> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
> --from-beginning
>
> hello kafka streams
>
> all streams lead to kafka
>
> join kafka summit
>
> test1
>
> test2
>
> test3
>
> test4
>
> Running above command for WordsWithCountsTopic returns nothing*.*
>
> My program runs out of docker machine, and it does not return any error.
>
> I checked kafka logs and kafka-connect logs, no information is shown.
> Wondering what is the log level in kafka/kafka-connect.
>
>
> Best regards,
> Mina
>
>
>
>
> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> I noticed in your example that you are using localhost:9092 to produce
>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and
>> the Kafka Streams app all running within one docker container, or in
>> different containers?
>>
>> I just tested the WordCountLambdaExample and it works for me. This might
>> not have anything to do with streams, but rather with the Kafka
>> configuration and whether streams (that is just an app) can reach Kafka at
>> all. If you provide the above information we can look further.
>>
>>
>>
>> Thanks
>> Eno
>>
>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
>> >
>> > I reset and still not working!
>> >
>> > My env is setup using
>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>> >
>> > I just tried using
>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/main/java/io/confluent/examples/streams/Wor
>> dCountLambdaExample.java#L178-L181
>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>> created
>> > from scratch as went through the steps as directed.
>> >
>> > When I stopped the java program and check the topics below are the data
>> in
>> > each topic.
>> >
>> > docker run \
>> >
>> >  --net=host \
>> >
>> >  --rm \
>> >
>> >  confluentinc/cp-kafka:3.2.0 \
>> >
>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
>> > TextLinesTopic --new-consumer --from-beginning
>> >
>> >
>> > SHOWS
>> >
>> > hello kafka streams
>> >
>> > all streams lead to kafka
>> >
>> > join kafka summit
>> >
>> > test1
>> >
>> > test2
>> >
>> > test3
>> >
>> > test4
>> >
>> > FOR WordsWithCountsTopic nothing is shown
>> >
>> >
>> > I am new to the Kafka/Kafka Stream and still do not understand why a
>> simple
>> > example does not work!
>> >
>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io
>> >
>> > wrote:
>> >
>> >>>> So, when I check the number of messages in wordCount-input I see the
>> >> same
>> >>>> messages. However, when I run below code I do not see any
>> message/data
>> >> in
>> >>>> wordCount-output.
>> >>
>> >> Did you reset your application?
>> >>
>> >> Each time you run you app and restart it, it will resume processing
>> >> where it left off. Thus, if something went wrong in you first run but
>> >> you got committed offsets, the app will not re-read the whole topic.
>> >>
>> >> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> >> application-id from StreamConfig is used a group.id.
>> >>
>> >> Thus, resetting you app would be required to consumer the input topic
>> >> from scratch. Of you just write new data to you input topic.
>> >>
>> >>>> Can I connect to kafka in VM/docker container using below code or do
>> I
>> >> need
>> >>>> to change/add other parameters? How can I submit the code to
>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>> the
>> >>>> code(e.g. jar file)?
>> >>
>> >> A Streams app is a regular Java application and can run anywhere --
>> >> there is no notion of a processing cluster and you don't "submit" your
>> >> code -- you just run your app.
>> >>
>> >> Thus, if your console consumer can connect to the cluster, your Streams
>> >> app should also be able to connect to the cluster.
>> >>
>> >>
>> >> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> >> seems log to process just a few records). But you might need to put
>> >> startup delay into account. I would recommend to register a shutdown
>> >> hook: see
>> >> https://github.com/confluentinc/examples/blob/3.
>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>> >> WordCountLambdaExample.java#L178-L181
>> >>
>> >>
>> >> Hope this helps.
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
>> >>> Hi Matthias,
>> >>>
>> >>> Thank you for the quick response, appreciate it!
>> >>>
>> >>> I created the topics wordCount-input and wordCount-output. Pushed some
>> >> data
>> >>> to wordCount-input using
>> >>>
>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>> >>> wordCount-input
>> >>>
>> >>> test
>> >>>
>> >>> new
>> >>>
>> >>> word
>> >>>
>> >>> count
>> >>>
>> >>> wordcount
>> >>>
>> >>> word count
>> >>>
>> >>> So, when I check the number of messages in wordCount-input I see the
>> same
>> >>> messages. However, when I run below code I do not see any
>> message/data in
>> >>> wordCount-output.
>> >>>
>> >>> Can I connect to kafka in VM/docker container using below code or do I
>> >> need
>> >>> to change/add other parameters? How can I submit the code to
>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> >>> code(e.g. jar file)?
>> >>>
>> >>> I really appreciate your input as I am blocked and cannot run even
>> below
>> >>> simple example.
>> >>>
>> >>> Best regards,
>> >>> Mina
>> >>>
>> >>> I changed the code to be as below:
>> >>>
>> >>> Properties props = new Properties();
>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> "wordCount-streaming");
>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "<ipAddress>:9092");
>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>
>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>> >>>
>> >>> // setting offset reset to earliest so that we can re-run the demo
>> >>> code with the same pre-loaded data
>> >>> // Note: To re-run the demo, you need to use the offset reset tool:
>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
>> >> Kafka+Streams+Application+Reset+Tool
>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>> >>>
>> >>> KStreamBuilder builder = new KStreamBuilder();
>> >>>
>> >>> KStream<String, String> source = builder.stream("wordCount-input");
>> >>>
>> >>> KTable<String, Long> counts = source
>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>> >>>         @Override
>> >>>         public Iterable<String> apply(String value) {
>> >>>            return
>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>> >>>         }
>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>> >> String>>() {
>> >>>         @Override
>> >>>         public KeyValue<String, String> apply(String key, String
>> value)
>> >> {
>> >>>            return new KeyValue<>(value, value);
>> >>>         }
>> >>>      })
>> >>>      .groupByKey()
>> >>>      .count("Counts");
>> >>>
>> >>> // need to override value serde to Long type
>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>> >>>
>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
>> >>> streams.start();
>> >>>
>> >>> // usually the stream application would be running forever,
>> >>> // in this example we just let it run for some time and stop since the
>> >>> input data is finite.
>> >>> Thread.sleep(5000L);
>> >>>
>> >>> streams.close();
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> >>> wrote:
>> >>>
>> >>>> Maybe you need to reset your application using the reset tool:
>> >>>> http://docs.confluent.io/current/streams/developer-
>> >>>> guide.html#application-reset-tool
>> >>>>
>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
>> might
>> >>>> only see data on commit.
>> >>>>
>> >>>> Try to reduce commit interval or disable caching by setting
>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>> >>>>> Hi,
>> >>>>>
>> >>>>> This is the first time that am using Kafka Stream. I would like to
>> read
>> >>>>> from input topic and write to output topic. However, I do not see
>> the
>> >>>> word
>> >>>>> count when I try to run below example. Looks like that it does not
>> >>>> connect
>> >>>>> to Kafka. I do not see any error though. I tried my localhost kafka
>> as
>> >>>> well
>> >>>>> as the container in a VM, same situation.
>> >>>>>
>> >>>>> There are over 200 message in the input kafka topic.
>> >>>>>
>> >>>>> Your input is appreciated!
>> >>>>>
>> >>>>> Best regards,
>> >>>>> Mina
>> >>>>>
>> >>>>> import org.apache.kafka.common.serialization.*;
>> >>>>> import org.apache.kafka.streams.*;
>> >>>>> import org.apache.kafka.streams.kstream.*;
>> >>>>>
>> >>>>> import java.util.*;
>> >>>>> import java.util.regex.*;
>> >>>>>
>> >>>>> public class WordCountExample {
>> >>>>>
>> >>>>>
>> >>>>>   public static void main(String [] args)   {
>> >>>>>      final Properties streamsConfiguration = new Properties();
>> >>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> >>>>> "wordcount-streaming");
>> >>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_
>> CONFIG,
>> >>>>> "<IPADDRESS>:9092");
>> >>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>>>> Serdes.String().getClass().getName());
>> >>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_
>> CONFIG,
>> >>>>> Serdes.String().getClass().getName());
>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>> >> MS_CONFIG,
>> >>>>> 10 * 1000);
>> >>>>>
>> >>>>>      final Serde<String> stringSerde = Serdes.String();
>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
>> >>>>>
>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
>> >>>>>
>> >>>>>      final KStream<String, String> textLines =
>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>> >>>>>
>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
>> >>>>>
>> >>>>>      final KStream<String, Long> wordCounts = textLines
>> >>>>>            .flatMapValues(value ->
>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>> >>>>>            .groupBy((key, word) -> word)
>> >>>>>            .count("Counts")
>> >>>>>            .toStream();
>> >>>>>
>> >>>>>
>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>> >>>>>
>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>> >>>>> streamsConfiguration);
>> >>>>>      streams.cleanUp();
>> >>>>>      streams.start();
>> >>>>>
>> >>>>>      Runtime.getRuntime().addShutdownHook(new
>> >>>> Thread(streams::close));  }
>> >>>>> }
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >>
>>
>>
>

Reply via email to