Any book, document and provides information on how to use kafka stream? On Tue, Mar 14, 2017 at 2:42 PM, Mina Aslani <aslanim...@gmail.com> wrote:
> I reset and still not working! > > My env is setup using http://docs.confluent.io/3.2.0/cp-docker-images/ > docs/quickstart.html > > I just tried using https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > WordCountLambdaExample.java#L178-L181 with all the topics(e.g. TextLinesTopic > and WordsWithCountsTopic) created from scratch as went through the steps > as directed. > > When I stopped the java program and check the topics below are the data in > each topic. > > docker run \ > > --net=host \ > > --rm \ > > confluentinc/cp-kafka:3.2.0 \ > > kafka-console-consumer --bootstrap-server localhost:29092 --topic > TextLinesTopic --new-consumer --from-beginning > > > SHOWS > > hello kafka streams > > all streams lead to kafka > > join kafka summit > > test1 > > test2 > > test3 > > test4 > > FOR WordsWithCountsTopic nothing is shown > > > I am new to the Kafka/Kafka Stream and still do not understand why a > simple example does not work! > > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> >> So, when I check the number of messages in wordCount-input I see the >> same >> >> messages. However, when I run below code I do not see any message/data >> in >> >> wordCount-output. >> >> Did you reset your application? >> >> Each time you run you app and restart it, it will resume processing >> where it left off. Thus, if something went wrong in you first run but >> you got committed offsets, the app will not re-read the whole topic. >> >> You can check committed offset via bin/kafka-consumer-groups.sh. The >> application-id from StreamConfig is used a group.id. >> >> Thus, resetting you app would be required to consumer the input topic >> from scratch. Of you just write new data to you input topic. >> >> >> Can I connect to kafka in VM/docker container using below code or do I >> need >> >> to change/add other parameters? How can I submit the code to >> >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the >> >> code(e.g. jar file)? >> >> A Streams app is a regular Java application and can run anywhere -- >> there is no notion of a processing cluster and you don't "submit" your >> code -- you just run your app. >> >> Thus, if your console consumer can connect to the cluster, your Streams >> app should also be able to connect to the cluster. >> >> >> Maybe, the short runtime of 5 seconds could be a problem (even if it >> seems log to process just a few records). But you might need to put >> startup delay into account. I would recommend to register a shutdown >> hook: see >> https://github.com/confluentinc/examples/blob/3.2.x/kafka- >> streams/src/main/java/io/confluent/examples/streams/Wor >> dCountLambdaExample.java#L178-L181 >> >> >> Hope this helps. >> >> -Matthias >> >> >> On 3/13/17 7:30 PM, Mina Aslani wrote: >> > Hi Matthias, >> > >> > Thank you for the quick response, appreciate it! >> > >> > I created the topics wordCount-input and wordCount-output. Pushed some >> data >> > to wordCount-input using >> > >> > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}") >> > /bin/kafka-console-producer --broker-list localhost:9092 --topic >> > wordCount-input >> > >> > test >> > >> > new >> > >> > word >> > >> > count >> > >> > wordcount >> > >> > word count >> > >> > So, when I check the number of messages in wordCount-input I see the >> same >> > messages. However, when I run below code I do not see any message/data >> in >> > wordCount-output. >> > >> > Can I connect to kafka in VM/docker container using below code or do I >> need >> > to change/add other parameters? How can I submit the code to >> > kafka/kafka-connect? Do we have similar concept as SPARK to submit the >> > code(e.g. jar file)? >> > >> > I really appreciate your input as I am blocked and cannot run even below >> > simple example. >> > >> > Best regards, >> > Mina >> > >> > I changed the code to be as below: >> > >> > Properties props = new Properties(); >> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming"); >> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092"); >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass().getName()); >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass().getName()); >> > >> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); >> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >> > >> > // setting offset reset to earliest so that we can re-run the demo >> > code with the same pre-loaded data >> > // Note: To re-run the demo, you need to use the offset reset tool: >> > // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+ >> Streams+Application+Reset+Tool >> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >> > >> > KStreamBuilder builder = new KStreamBuilder(); >> > >> > KStream<String, String> source = builder.stream("wordCount-input"); >> > >> > KTable<String, Long> counts = source >> > .flatMapValues(new ValueMapper<String, Iterable<String>>() { >> > @Override >> > public Iterable<String> apply(String value) { >> > return >> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); >> > } >> > }).map(new KeyValueMapper<String, String, KeyValue<String, >> String>>() { >> > @Override >> > public KeyValue<String, String> apply(String key, String >> value) { >> > return new KeyValue<>(value, value); >> > } >> > }) >> > .groupByKey() >> > .count("Counts"); >> > >> > // need to override value serde to Long type >> > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); >> > >> > KafkaStreams streams = new KafkaStreams(builder, props); >> > streams.start(); >> > >> > // usually the stream application would be running forever, >> > // in this example we just let it run for some time and stop since the >> > input data is finite. >> > Thread.sleep(5000L); >> > >> > streams.close(); >> > >> > >> > >> > >> > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io >> > >> > wrote: >> > >> >> Maybe you need to reset your application using the reset tool: >> >> http://docs.confluent.io/current/streams/developer- >> >> guide.html#application-reset-tool >> >> >> >> Also keep in mind, that KTables buffer internally, and thus, you might >> >> only see data on commit. >> >> >> >> Try to reduce commit interval or disable caching by setting >> >> "cache.max.bytes.buffering" to zero in your StreamsConfig. >> >> >> >> >> >> -Matthias >> >> >> >> On 3/13/17 12:29 PM, Mina Aslani wrote: >> >>> Hi, >> >>> >> >>> This is the first time that am using Kafka Stream. I would like to >> read >> >>> from input topic and write to output topic. However, I do not see the >> >> word >> >>> count when I try to run below example. Looks like that it does not >> >> connect >> >>> to Kafka. I do not see any error though. I tried my localhost kafka as >> >> well >> >>> as the container in a VM, same situation. >> >>> >> >>> There are over 200 message in the input kafka topic. >> >>> >> >>> Your input is appreciated! >> >>> >> >>> Best regards, >> >>> Mina >> >>> >> >>> import org.apache.kafka.common.serialization.*; >> >>> import org.apache.kafka.streams.*; >> >>> import org.apache.kafka.streams.kstream.*; >> >>> >> >>> import java.util.*; >> >>> import java.util.regex.*; >> >>> >> >>> public class WordCountExample { >> >>> >> >>> >> >>> public static void main(String [] args) { >> >>> final Properties streamsConfiguration = new Properties(); >> >>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >> >>> "wordcount-streaming"); >> >>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_ >> CONFIG, >> >>> "<IPADDRESS>:9092"); >> >>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> >>> Serdes.String().getClass().getName()); >> >>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_ >> CONFIG, >> >>> Serdes.String().getClass().getName()); >> >>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_ >> CONFIG, >> >>> 10 * 1000); >> >>> >> >>> final Serde<String> stringSerde = Serdes.String(); >> >>> final Serde<Long> longSerde = Serdes.Long(); >> >>> >> >>> final KStreamBuilder builder = new KStreamBuilder(); >> >>> >> >>> final KStream<String, String> textLines = >> >>> builder.stream(stringSerde, stringSerde, "wordcount-input"); >> >>> >> >>> final Pattern pattern = Pattern.compile("\\W+", >> >>> Pattern.UNICODE_CHARACTER_CLASS); >> >>> >> >>> final KStream<String, Long> wordCounts = textLines >> >>> .flatMapValues(value -> >> >>> Arrays.asList(pattern.split(value.toLowerCase()))) >> >>> .groupBy((key, word) -> word) >> >>> .count("Counts") >> >>> .toStream(); >> >>> >> >>> >> >>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); >> >>> >> >>> final KafkaStreams streams = new KafkaStreams(builder, >> >>> streamsConfiguration); >> >>> streams.cleanUp(); >> >>> streams.start(); >> >>> >> >>> Runtime.getRuntime().addShutdownHook(new >> >> Thread(streams::close)); } >> >>> } >> >>> >> >> >> >> >> > >> >> >