Re: `key.converter` per connector

2017-05-17 Thread Nicolas Fouché
KA/KIP- > 75+-+Add+per-connector+Converters > > > Thanks, > Bharat > > > On Tue, May 16, 2017 at 1:39 AM, Nicolas Fouché > wrote: > > > Hi, > > > > In distributed mode, can I set `key.converter` in a connector > > configuration. Some of my con

`key.converter` per connector

2017-05-16 Thread Nicolas Fouché
Hi, In distributed mode, can I set `key.converter` in a connector configuration. Some of my connectors use `org.apache.kafka.connect.storage.StringConverter` while others would need `io.confluent.connect.avro.AvroConverter`. So I wondered if `key.converter` could be overriden at the connector lev

Re: Streams - Got bit by log levels of the record cache

2017-03-13 Thread Nicolas Fouché
Mar 9, 2017 at 6:22 PM, Damian Guy wrote: > > > Hi Nicolas, > > > > Please do file a JIRA. > > > > Many thanks, > > Damian > > > > On Thu, 9 Mar 2017 at 15:54 Nicolas Fouché wrote: > > > > > Hi, > > > > > > I just w

Streams - Got bit by log levels of the record cache

2017-03-09 Thread Nicolas Fouché
Hi, I just wanted to share how we misinterpreted logs from Streams at the INFO level. Version 0.10.1.1, I think it's the same for 0.10.2.0. So, we configured `commit.interval.ms` and `cache.max.bytes.buffering`, and we expected to always reach the commit interval before the maximum bytes. It was

Re: Wait a few seconds before initializing state stores, so others don't have to wait before joining.

2017-02-28 Thread Nicolas Fouché
e. We did have some > discussions around this already. However, it's more tricky to do than it > seems at a first glace. We hope to introduce something like this for the > next release. > > > -Matthias > > > > On 2/28/17 9:10 AM, Nicolas Fouché wrote: > > H

Wait a few seconds before initializing state stores, so others don't have to wait before joining.

2017-02-28 Thread Nicolas Fouché
Hi, I have 10 Kafka Streams processes which consume a topic with 10 partitions, with a few changelog topics. Let's say these processes are all stopped, and I start them nearly at the same time (in a matter of seconds). The first process seems to start initializing all state stores, which takes 1

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-19 Thread Nicolas Fouché
rmation I shared on StackOverflow (perhaps a bit outdated > by now, was back in Aug 2016) about how you can add a state store when > using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580 > > -Michael > > > > > On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fou

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Nicolas Fouché
Ho my, I'm dumb. One can give multiple predicates to `KStream.branch()`. 2017-01-18 17:18 GMT+01:00 Nicolas Fouché : > The reason I would not use `KStream.transform()` is that I want to call > `ProcessorContext.forward()` several times, to different children. These > children are s

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Nicolas Fouché
> > > Good question! I'm not sure why it is a terminal operation, maybe one > of > > > the original authors can chip in. However, you could probably work > around > > > it by using TopologyBuilder.addProcessor(...) rather then > > KStream.pr

Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Nicolas Fouché
Hi, as far as I understand, calling `KStream.process` prevents the developer from adding further operations to a `KStreamBuilder` [1], because its return type is `void`. Good. But it also prevents the developer from adding operations to its superclass `TopologyBuilder`. In my case I wanted to add

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-16 Thread Nicolas Fouché
In the case of KAFKA-4468, it's more about state stores. But still, keys would not be backward compatible. What is the "official" policy about this kind of change ? 2017-01-16 23:47 GMT+01:00 Nicolas Fouché : > Hi Eno, > I thought it would be impossible to put this in Kafk

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-16 Thread Nicolas Fouché
t. > > Thanks > Eno > > On 16 Jan 2017, at 18:46, Nicolas Fouché wrote: > > > > My current implementation: > > https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just > > appended the window `end` at the end of the byte array. > > Comment

Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-16 Thread Nicolas Fouché
If anyone is interested, here is my custom timestamp extractor: https://gist.github.com/nfo/54d5830720e163d2e7e848b6e4baac20 . 2017-01-16 15:52 GMT+01:00 Nicolas Fouché : > Hi Michael, > > got it. I understand that it would be less error-prone to generate the > final "altered&

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-16 Thread Nicolas Fouché
My current implementation: https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just appended the window `end` at the end of the byte array. Comments and suggestions are welcome ! 2017-01-16 15:48 GMT+01:00 Nicolas Fouché : > Hi Damian, > > I recall now that I c

Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-16 Thread Nicolas Fouché
imestamp) can simply re-use the timestamp > embedded in the payload without having to know/worry about the custom > calculation. It might also be easier for Ops personnel to have access to a > ready-to-use timestamp in case they need to debug or troubleshoot. > > -Michael > >

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-16 Thread Nicolas Fouché
> However, there is an open jira for this: > https://issues.apache.org/jira/browse/KAFKA-4468 > > Thanks, > Damian > > On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché wrote: > > > Hi, > > > > In the same topology, I generate aggregates with 1-day windows and 1-

Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

2017-01-16 Thread Nicolas Fouché
Hi, In the same topology, I generate aggregates with 1-day windows and 1-week windows and write them in one single topic. On Mondays, these windows have the same start time. The effect: these aggregates overrides each other. That happens because WindowedSerializer [1] only serializes the window s

Re: Kafka Streams: setting the compression codec for intermediate and output topics

2017-01-16 Thread Nicolas Fouché
Perfect, thanks a lot Damian ! 2017-01-16 10:07 GMT+01:00 Damian Guy : > Hi Nicolas, > > Yes, you can set it by setting the producer property on the StreamsConfig. > > Thanks, > Damian > > On Sun, 15 Jan 2017 at 21:25 Nicolas Fouché wrote: > > > Hi, > >

Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-15 Thread Nicolas Fouché
m-processing-and-interactive- > queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream- > processing-and-interactive-queries-in-apache-kafka/>) to first get all > your data in KTables and then query it periodically (you can decide on the > frequency manually). >

Kafka Streams: setting the compression codec for intermediate and output topics

2017-01-15 Thread Nicolas Fouché
Hi, how would I enable compression for records produced by Kafka Streams. By adding the producer property "compression.type" to the props sent to the `KafkaStreams` constructor ? Or it would reuse the compression codec used in input topics ? Thanks. Nicolas

Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-12 Thread Nicolas Fouché
Hi, long long technical story, sorry for that. I'm dealing with a special case. My input topic receives records containing an id in the key (and another field for partitioning), and a version number in the value, amongst other metrics. Records with the same id are sent every 5 seconds, and the ve

Re: Kafka Streams: consume 6 months old data VS windows maintain durations

2017-01-12 Thread Nicolas Fouché
uent.io/ > blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/ > <https://www.confluent.io/blog/data-reprocessing-with- > kafka-streams-resetting-a-streams-application/>). I wouldn't recommend > using that if you want to keep the results of the previous run th

Kafka Streams: consume 6 months old data VS windows maintain durations

2017-01-12 Thread Nicolas Fouché
Hi. I'd like to re-consume 6 months old data with Kafka Streams. My current topology can't because it defines aggregations with windows maintain durations of 3 days. TimeWindows.of(ONE_HOUR_MILLIS).until(THREE_DAYS_MILLIS) As discovered (and shared [1]) a few months ago, consuming a record o

Re: Dismissing late messages in Kafka Streams

2016-10-20 Thread Nicolas Fouché
I forgot to mention that the default maintain duration of a window is 1 day. Would it be useful to warn the developer is the current maintain duration is "not compatible" with the current window size and interval ? 2016-10-20 14:49 GMT+02:00 Nicolas Fouché : > Hi Michael, >

Re: Dismissing late messages in Kafka Streams

2016-10-20 Thread Nicolas Fouché
of > replaying/reprocessing of old messages. Do you have specific questions > here that we can help you with in the meantime? > > [1] > http://www.confluent.io/blog/data-reprocessing-with-kafka- > streams-resetting-a-streams-application > > > > > > > On Th

Dismissing late messages in Kafka Streams

2016-10-20 Thread Nicolas Fouché
Hi, I aggregate some data with `aggregateByKey` and a `TimeWindows`. I set the maintain duration of the window to 30 days. If it consumes a message older than 30 days, then a new aggregate is created for this old window. The problem is that this old windowed aggregate is of course incomplete and