Did you reset your application?

Each time you run you app and restart it, it will resume processing
where it left off. Thus, if something went wrong in you first run but
you got committed offsets, the app will not re-read the whole topic.

You can check committed offset via bin/kafka-consumer-groups.sh. The
application-id from StreamConfig is used a group.id.

Thus, resetting you app would be required to consumer the input topic
from scratch. Of you just write new data to you input topic.

>> Can I connect to kafka in VM/docker container using below code or do I need
>> to change/add other parameters? How can I submit the code to
>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>> code(e.g. jar file)?

A Streams app is a regular Java application and can run anywhere --
there is no notion of a processing cluster and you don't "submit" your
code -- you just run your app.

Thus, if your console consumer can connect to the cluster, your Streams
app should also be able to connect to the cluster.

Maybe, the short runtime of 5 seconds could be a problem (even if it
seems log to process just a few records). But you might need to put
startup delay into account. I would recommend to register a shutdown
hook: see

Hope this helps.


On 3/13/17 7:30 PM, Mina Aslani wrote:
> Hi Matthias,
> Thank you for the quick response, appreciate it!
> I created the topics wordCount-input and wordCount-output. Pushed some data
> to wordCount-input using
> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> wordCount-input
> test
> new
> word
> count
> wordcount
> word count
> So, when I check the number of messages in wordCount-input I see the same
> messages. However, when I run below code I do not see any message/data in
> wordCount-output.
> Can I connect to kafka in VM/docker container using below code or do I need
> to change/add other parameters? How can I submit the code to
> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> code(e.g. jar file)?
> I really appreciate your input as I am blocked and cannot run even below
> simple example.
> Best regards,
> Mina
> I changed the code to be as below:
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
> // Note: To re-run the demo, you need to use the offset reset tool:
> // 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> source = builder.stream("wordCount-input");
> KTable<String, Long> counts = source
>       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>          @Override
>          public Iterable<String> apply(String value) {
>             return
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>          }
>       }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
>          @Override
>          public KeyValue<String, String> apply(String key, String value) {
>             return new KeyValue<>(value, value);
>          }
>       })
>       .groupByKey()
>       .count("Counts");
> // need to override value serde to Long type
> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
> // usually the stream application would be running forever,
> // in this example we just let it run for some time and stop since the
> input data is finite.
> Thread.sleep(5000L);
> streams.close();
> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>> Maybe you need to reset your application using the reset tool:
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#application-reset-tool
>> Also keep in mind, that KTables buffer internally, and thus, you might
>> only see data on commit.
>> Try to reduce commit interval or disable caching by setting
>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>> -Matthias
>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>> Hi,
>>> This is the first time that am using Kafka Stream. I would like to read
>>> from input topic and write to output topic. However, I do not see the
>> word
>>> count when I try to run below example. Looks like that it does not
>> connect
>>> to Kafka. I do not see any error though. I tried my localhost kafka as
>> well
>>> as the container in a VM, same situation.
>>> There are over 200 message in the input kafka topic.
>>> Your input is appreciated!
>>> Best regards,
>>> Mina
>>> import org.apache.kafka.common.serialization.*;
>>> import org.apache.kafka.streams.*;
>>> import org.apache.kafka.streams.kstream.*;
>>> import java.util.*;
>>> import java.util.regex.*;
>>> public class WordCountExample {
>>>    public static void main(String [] args)   {
>>>       final Properties streamsConfiguration = new Properties();
>>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> "wordcount-streaming");
>>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> "<IPADDRESS>:9092");
>>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>> 10 * 1000);
>>>       final Serde<String> stringSerde = Serdes.String();
>>>       final Serde<Long> longSerde = Serdes.Long();
>>>       final KStreamBuilder builder = new KStreamBuilder();
>>>       final KStream<String, String> textLines =
>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>       final Pattern pattern = Pattern.compile("\\W+",
>>>       final KStream<String, Long> wordCounts = textLines
>>>             .flatMapValues(value ->
>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>             .groupBy((key, word) -> word)
>>>             .count("Counts")
>>>             .toStream();
>>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>       final KafkaStreams streams = new KafkaStreams(builder,
>>> streamsConfiguration);
>>>       streams.cleanUp();
>>>       streams.start();
>>>       Runtime.getRuntime().addShutdownHook(new
>> Thread(streams::close));  }
>>> }

