Re: Question about Kafka Streams error message when a message is larger than the maximum size the server will accept

2018-03-28 Thread Guozhang Wang
Yes that is correlated, thanks for the reminder. I've updated the JIRA to reflect your observations as well. Guozhang On Wed, Mar 28, 2018 at 12:41 AM, Mihaela Stoycheva < mihaela.stoych...@gmail.com> wrote: > Hello Guozhang, > > Thank you for the answer, that could explain what is happening.

Re: Question about Kafka Streams error message when a message is larger than the maximum size the server will accept

2018-03-28 Thread Mihaela Stoycheva
Hello Guozhang, Thank you for the answer, that could explain what is happening. Is it possible that this is related in some way to https://issues.apache.org/jira/browse/KAFKA-6538? Mihaela On Wed, Mar 28, 2018 at 2:21 AM, Guozhang Wang wrote: > Hello Mihaela, > > It is possible that when you h

Re: Question about Kafka Streams error message when a message is larger than the maximum size the server will accept

2018-03-27 Thread Guozhang Wang
Hello Mihaela, It is possible that when you have caching enabled, the value of the record has already been serialized before sending to the changelogger while the key was not. Admittedly it is not very friendly for trouble-shooting related log4j entries.. Guozhang On Tue, Mar 27, 2018 at 5:25

Question about Kafka Streams error message when a message is larger than the maximum size the server will accept

2018-03-27 Thread Mihaela Stoycheva
Hello, I have a Kafka Streams application that is consuming from two topics and internally aggregating, transforming and joining data. One of the aggregation steps is adding an id to an ArrayList of ids. Naturally since there was a lot of data the changelog message became too big and was not sent

Re: Kafka Streams Error

2016-11-03 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 First a hint about "group.id". Please read this to make sense of this parameter: http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups - -and-offset-management It might also help to understand how to get the "last value" of a top

Re: Kafka Streams Error

2016-11-03 Thread Furkan KAMACI
I've just realised the parameter of poll method. It's been explained as: "The time, in milliseconds, spent waiting in poll if data is not available in the buffer." When I set to a big number ''sometimes" I can see a result in it. When I set it to 0 and push something to do topic that it listens s

Re: Kafka Streams Error

2016-11-03 Thread Furkan KAMACI
Hi Matthias, Thanks for the response. I stream output as follows: longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), Serdes.Long(), "qps-aggregated"); I want to read last value from that topic at another applicati

Re: Kafka Streams Error

2016-11-02 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Hi, first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you application for the first time. If you start it a second time, it will resume from where is left off. About getting numbers starting from zero: this is expected behavior bec

Kafka Streams Error

2016-11-02 Thread Furkan KAMACI
I use Kafka 0.10.0.1. I count the messages of a topic as follows: ... streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ... KStream longs = builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ... KTable, Long> longCounts = longs.countByKey(Ti

Re: Kafka Streams error

2016-05-25 Thread Guozhang Wang
Hi Walter, Synced up with Ismael regarding your questions, and here are our suggestions: "Scala 2.11 is the minimum. The downside of that flag is that it includes new features that are still changing, may be less stable and may not work fully. You may probably consider planning an upgrade as su

Kafka Streams error

2016-05-24 Thread Walter rakoff
Hello, I'm trying a sample Kafka Streams program in Scala. val clickRecordKStream: KStream[String, ClickRecord] = kStreamBuilder.stream(stringSerde, stringSerde, "test-topic") .map( (k:String, v:String) => (k, ClickRecord(v))) The map call throws error "type mismatch; found : (St