Any book, document and provides information on how to use kafka stream?

On Tue, Mar 14, 2017 at 2:42 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> I reset and still not working!
>
> My env is setup using http://docs.confluent.io/3.2.0/cp-docker-images/
> docs/quickstart.html
>
> I just tried using https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181 with all the topics(e.g. TextLinesTopic
> and WordsWithCountsTopic) created from scratch as went through the steps
> as directed.
>
> When I stopped the java program and check the topics below are the data in
> each topic.
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:29092 --topic
> TextLinesTopic --new-consumer --from-beginning
>
>
> SHOWS
>
> hello kafka streams
>
> all streams lead to kafka
>
> join kafka summit
>
> test1
>
> test2
>
> test3
>
> test4
>
> FOR WordsWithCountsTopic nothing is shown
>
>
> I am new to the Kafka/Kafka Stream and still do not understand why a
> simple example does not work!
>
> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> >> So, when I check the number of messages in wordCount-input I see the
>> same
>> >> messages. However, when I run below code I do not see any message/data
>> in
>> >> wordCount-output.
>>
>> Did you reset your application?
>>
>> Each time you run you app and restart it, it will resume processing
>> where it left off. Thus, if something went wrong in you first run but
>> you got committed offsets, the app will not re-read the whole topic.
>>
>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> application-id from StreamConfig is used a group.id.
>>
>> Thus, resetting you app would be required to consumer the input topic
>> from scratch. Of you just write new data to you input topic.
>>
>> >> Can I connect to kafka in VM/docker container using below code or do I
>> need
>> >> to change/add other parameters? How can I submit the code to
>> >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> >> code(e.g. jar file)?
>>
>> A Streams app is a regular Java application and can run anywhere --
>> there is no notion of a processing cluster and you don't "submit" your
>> code -- you just run your app.
>>
>> Thus, if your console consumer can connect to the cluster, your Streams
>> app should also be able to connect to the cluster.
>>
>>
>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> seems log to process just a few records). But you might need to put
>> startup delay into account. I would recommend to register a shutdown
>> hook: see
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/main/java/io/confluent/examples/streams/Wor
>> dCountLambdaExample.java#L178-L181
>>
>>
>> Hope this helps.
>>
>> -Matthias
>>
>>
>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>> > Hi Matthias,
>> >
>> > Thank you for the quick response, appreciate it!
>> >
>> > I created the topics wordCount-input and wordCount-output. Pushed some
>> data
>> > to wordCount-input using
>> >
>> > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>> > /bin/kafka-console-producer --broker-list localhost:9092 --topic
>> > wordCount-input
>> >
>> > test
>> >
>> > new
>> >
>> > word
>> >
>> > count
>> >
>> > wordcount
>> >
>> > word count
>> >
>> > So, when I check the number of messages in wordCount-input I see the
>> same
>> > messages. However, when I run below code I do not see any message/data
>> in
>> > wordCount-output.
>> >
>> > Can I connect to kafka in VM/docker container using below code or do I
>> need
>> > to change/add other parameters? How can I submit the code to
>> > kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> > code(e.g. jar file)?
>> >
>> > I really appreciate your input as I am blocked and cannot run even below
>> > simple example.
>> >
>> > Best regards,
>> > Mina
>> >
>> > I changed the code to be as below:
>> >
>> > Properties props = new Properties();
>> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
>> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> >
>> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>> >
>> > // setting offset reset to earliest so that we can re-run the demo
>> > code with the same pre-loaded data
>> > // Note: To re-run the demo, you need to use the offset reset tool:
>> > // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+
>> Streams+Application+Reset+Tool
>> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>> >
>> > KStreamBuilder builder = new KStreamBuilder();
>> >
>> > KStream<String, String> source = builder.stream("wordCount-input");
>> >
>> > KTable<String, Long> counts = source
>> >       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>> >          @Override
>> >          public Iterable<String> apply(String value) {
>> >             return
>> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>> >          }
>> >       }).map(new KeyValueMapper<String, String, KeyValue<String,
>> String>>() {
>> >          @Override
>> >          public KeyValue<String, String> apply(String key, String
>> value) {
>> >             return new KeyValue<>(value, value);
>> >          }
>> >       })
>> >       .groupByKey()
>> >       .count("Counts");
>> >
>> > // need to override value serde to Long type
>> > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>> >
>> > KafkaStreams streams = new KafkaStreams(builder, props);
>> > streams.start();
>> >
>> > // usually the stream application would be running forever,
>> > // in this example we just let it run for some time and stop since the
>> > input data is finite.
>> > Thread.sleep(5000L);
>> >
>> > streams.close();
>> >
>> >
>> >
>> >
>> > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io
>> >
>> > wrote:
>> >
>> >> Maybe you need to reset your application using the reset tool:
>> >> http://docs.confluent.io/current/streams/developer-
>> >> guide.html#application-reset-tool
>> >>
>> >> Also keep in mind, that KTables buffer internally, and thus, you might
>> >> only see data on commit.
>> >>
>> >> Try to reduce commit interval or disable caching by setting
>> >> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 3/13/17 12:29 PM, Mina Aslani wrote:
>> >>> Hi,
>> >>>
>> >>> This is the first time that am using Kafka Stream. I would like to
>> read
>> >>> from input topic and write to output topic. However, I do not see the
>> >> word
>> >>> count when I try to run below example. Looks like that it does not
>> >> connect
>> >>> to Kafka. I do not see any error though. I tried my localhost kafka as
>> >> well
>> >>> as the container in a VM, same situation.
>> >>>
>> >>> There are over 200 message in the input kafka topic.
>> >>>
>> >>> Your input is appreciated!
>> >>>
>> >>> Best regards,
>> >>> Mina
>> >>>
>> >>> import org.apache.kafka.common.serialization.*;
>> >>> import org.apache.kafka.streams.*;
>> >>> import org.apache.kafka.streams.kstream.*;
>> >>>
>> >>> import java.util.*;
>> >>> import java.util.regex.*;
>> >>>
>> >>> public class WordCountExample {
>> >>>
>> >>>
>> >>>    public static void main(String [] args)   {
>> >>>       final Properties streamsConfiguration = new Properties();
>> >>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> >>> "wordcount-streaming");
>> >>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_
>> CONFIG,
>> >>> "<IPADDRESS>:9092");
>> >>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_
>> CONFIG,
>> >>> Serdes.String().getClass().getName());
>> >>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_
>> CONFIG,
>> >>> 10 * 1000);
>> >>>
>> >>>       final Serde<String> stringSerde = Serdes.String();
>> >>>       final Serde<Long> longSerde = Serdes.Long();
>> >>>
>> >>>       final KStreamBuilder builder = new KStreamBuilder();
>> >>>
>> >>>       final KStream<String, String> textLines =
>> >>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>> >>>
>> >>>       final Pattern pattern = Pattern.compile("\\W+",
>> >>> Pattern.UNICODE_CHARACTER_CLASS);
>> >>>
>> >>>       final KStream<String, Long> wordCounts = textLines
>> >>>             .flatMapValues(value ->
>> >>> Arrays.asList(pattern.split(value.toLowerCase())))
>> >>>             .groupBy((key, word) -> word)
>> >>>             .count("Counts")
>> >>>             .toStream();
>> >>>
>> >>>
>> >>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
>> >>>
>> >>>       final KafkaStreams streams = new KafkaStreams(builder,
>> >>> streamsConfiguration);
>> >>>       streams.cleanUp();
>> >>>       streams.start();
>> >>>
>> >>>       Runtime.getRuntime().addShutdownHook(new
>> >> Thread(streams::close));  }
>> >>> }
>> >>>
>> >>
>> >>
>> >
>>
>>
>

Reply via email to