Thanks very much for taking the time to answer, Matthias! Very much appreciated
All the best, Marcus On Wed, Apr 7, 2021 at 10:22 PM Matthias J. Sax <mj...@apache.org> wrote: > Sorry for late reply... > > > > I only see issues of out of order data in my re-partitioned topic as a > result of a rebalance happening. > > If you re-partition, you may actually see out-of-order data even if > there is no rebalance. In the end, during repartitioning you have > multiple upstream writers for the repartition topic and thus interleaved > writes per partition. > > Maybe it's not a issue during regular processing for your use case, as > your throughput seems to be tiny. > > > > I believe all stream threads across all app instances will pause > consuming whilst the rebalance is worked through. > > No really. (1) If a thread dies, it takes some time to detect that the > thread died and thus all other thread continue to process until a > rebalance is even started. (2) With incremental rebalancing, partitions > that are not re-assigned are processed throughout a rebalance. > > > > but am I right in thinking that one streams app (or at least some of its > stream threads) will have to wait for state to be synced from the changelog > topic? > > That is correct. If a thread gets a new task assigned and needs to > recover state, it would pause processing for all its partitions/tasks > until restore finished. However, this pausing of partitions/tasks > happens only on a per-thread basis. Threads, even from the same app > instance, are totally agnostic to each other. > > So what your describe in your example make sense. > > > > If this is the case, do you think increasing standby replicas will > lessen the issue? > > Standbys can reduce your recovery time, and for your low throughput use > case may even reduce recovery time to zero. Thus, if the rebalance > itself happens quickly enough, the issue with out-of-order data may go > away (or at least should be large mitigated). > > > -Matthias > > > On 3/12/21 3:40 AM, Marcus Horsley-Rai wrote: > > Thanks Matthias - that's great to know. > > > >> Increasing the grace period should not really affect throughput, but > >> latency. > > > > Yes, a slip of the tongue on my part, you’re right :-) > > > > One last question if I may? I only see issues of out of order data in my > re-partitioned topic as a result of a rebalance happening. > > My hypothesis is that when an instance of my streams app dies - the > consumption of data from the partitions it was responsible for falls behind > compared to others. > > I believe all stream threads across all app instances will pause > consuming whilst the rebalance is worked through.. but am I right in > thinking that one streams app (or at least some of its stream threads) will > have to wait for state to be synced from the changelog topic? > > In other words - when a rebalance happens - I assume the consumer group > doesn’t wait for the slowest member to be ready to consume? > > > > To illustrate with an example: > > If I have 3 partitions of a single topic and three streams app > instances (1 partition each) > > I have a producer that produces to each partition each minute on > the minute > > Normally the timestamp of the head record is roughly the same > across all three partitions. This assumes no lag ever builds up on the > consumer group, and also assumes data volume and size of messages is > comparable. > > > > Now I kill streams app A. The rebalance protocol kicks in and > gives instance B an extra partition to consume from. > > Could there now be a bigger lag for one or both of the > partitions app B is consuming from because it had to sync state store > state? (Assume B has enough stream processing threads idle and the machine > is specced to cope with the extra load) > > …whereas app C, unhindered by state syncing, has potentially now > produced to the through topic a record from a newer batch/time window. > > > > If this is the case, do you think increasing standby replicas will > lessen the issue? I obviously don’t expect it to be a magic bullet, and > grace period is still required in general > > > > > > Best Regards, > > > > Marcus > > > > > > > > > > On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax <mj...@apache.org > <mailto:mj...@apache.org>> wrote: > >> will it consider a timestamp in the body of the message, if we have > implemented a custom TimeExtractor? > > > > Yes. > > > > > >> Or, which I feel is more likely - does TimeExtractor stream time only > apply later on once deserialisation has happened? > > > > Well, the extractor does apply after deserialization, but we deserialize > > each partition head-record to be able to apply the timestamp extractor: > > ie, deserialization happens when a record becomes the "head record". > > > > Cf > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java > < > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java > > > > > > > >> the accuracy of the aggregates may have to come second to the > throughput. > > > > Increasing the grace period should not really affect throughput, but > > latency. > > > > > > > > -Matthias > > > > > > On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote: > >> > >> Thanks for your reply Matthias, and really great talks :-) > >> > >> You’re right that I only have one input topic - though it does have 20 > partitions. > >> The pointer to max.task.idle.ms <http://max.task.idle.ms/> cleared > something up for me; I read the following line from Kafka docs but couldn’t > find what configuration they were referring to. > >> > >>> Within a stream task that may be processing multiple > topic-partitions, if users configure the application to not wait for all > partitions to contain some buffered data and pick from the partition with > the smallest timestamp to process the next record, then later on when some > records are fetched for other topic-partitions, their timestamps may be > smaller than those processed records fetched from another topic-partition. > >>> > >> > >> When streams is checking the head record of each partition to pick the > lowest timestamp - will it consider a timestamp in the body of the message, > if we have implemented a custom TimeExtractor? > >> Or, which I feel is more likely - does TimeExtractor stream time only > apply later on once deserialisation has happened? > >> The reason I ask is because our producer code doesn’t manually set the > timestamp in ProducerRecord, only in the JSON body. That may be something > we can look to change. > >> > >> As you say, I fear adjusting grace time may be my only solution; > however because this is a real-time monitoring application…the accuracy of > the aggregates may have to come second to the throughput. > >> > >> Many thanks, > >> > >> Marcus > >> > >> > >> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org <mailto: > m...@apache.org>> wrote: > >>> In general, Kafka Streams tries to process messages in timestamp > order,> > >>> ie, oldest message first. However, Kafka Streams always need to > process> > >>> messages in offset order per partition, and thus, the timestamp> > >>> synchronization applied to records from different topic (eg, if you > join> > >>> two topics).> > >>> > >>> There is config `max.task.idle.ms <http://max.task.idle.ms/>` to > improve timestamp synchronization,> > >>> but I am not sure if it would help in your case, as it seems you have > a> > >>> single input topic.> > >>> > >>> It seems, there is already out-of-order data in your input topic. > Also> > >>> note that your repartition step, may introduce out-or-order data.> > >>> > >>> As you are using a custom Processor, it is up to you to handle> > >>> out-of-order data, and it seems that you may need to introduce a > larger> > >>> grace period. In general, it's very hard (too impossible) to know how> > >>> much unorder is in a topic, due the decoupled nature of Kafka and> > >>> interleaved writes of different producers into a topic.> > >>> > >>> Not sure if you could change the original partitioning to just use> > >>> `location-id` to avoid the additional repartitioning step. This could> > >>> help to reduce unorder.> > >>> > >>> For completeness, check out those Kafka Summit talks:> > >>> -> > >>> > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ > < > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>> > > >>> -> > >>> > https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/ > < > https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>> > > >>> > >>> Hope this helps.> > >>> > >>> -Matthias> > >>> > >>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> > >>>> Hi All,> > >>>>> > >>>> Just to give a bit of context; I have an application which is SNMP > polling> > >>>> a network. Each collector agent works on a 1 minute schedule, > polling> > >>>> device(s) and posting results to a Kafka topic.> > >>>> The time a given collector publishes data can vary within a minute, > but it> > >>>> should never overlap with the next minute time bucket.> > >>>>> > >>>> The topic produced to, for arguments sake 'device-results' has > multiple> > >>>> partitions. The data is keyed such as 'device-id|location-id'.> > >>>>> > >>>> I then had a requirement to aggregate the data by location; every > device> > >>>> result within the same location is summed, and an aggregate is output > each> > >>>> minute.> > >>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is > a> > >>>> solution to this problem - but we found the throughput was abysmal -> > >>>> probably due to the I/O performance of our virtual machine > infrastructure.> > >>>>> > >>>> Instead we have hand-rolled something simplistic - which does the job > 99%> > >>>> well.> > >>>> - We use a through() to re-partition the topic to just location-id> > >>>> - We keep an object representing the current minute's aggregate in > an> > >>>> in-memory state store (with changelog)> > >>>> - When any device result is transformed, and has a timestamp that is > older> > >>>> than our current window time - we output the aggregate, otherwise > update> > >>>> the running sum.> > >>>>> > >>>> What I have noticed is that when I do a rolling restart of the> > >>>> application, such as to push new code, data is dropped because of > messages> > >>>> processed out of order.> > >>>> I changed the code to include the equivalent of an extra minute's > grace> > >>>> time, but in production I see messages arriving that are > 2min > behind what> > >>>> the latest messages are.> > >>>>> > >>>> I came across the documentation> > >>>> > https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering > < > https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>> > > >>>> which alluded to maybe a solution.> > >>>> Could anyone advise if there is a way in code/configuration > properties that> > >>>> I could better guarantee that streams prioritises the *oldest* > messages> > >>>> first, rather than caring about offset?> > >>>>> > >>>> Thanks in advance for any replies!> > >>>>> > >>>> Marcus> > >>>>> > > >