Thanks very much for taking the time to answer, Matthias!  Very much
appreciated

All the best,

Marcus

On Wed, Apr 7, 2021 at 10:22 PM Matthias J. Sax <mj...@apache.org> wrote:

> Sorry for late reply...
>
>
> > I only see issues of out of order data in my re-partitioned topic as a
> result of a rebalance happening.
>
> If you re-partition, you may actually see out-of-order data even if
> there is no rebalance. In the end, during repartitioning you have
> multiple upstream writers for the repartition topic and thus interleaved
> writes per partition.
>
> Maybe it's not a issue during regular processing for your use case, as
> your throughput seems to be tiny.
>
>
> > I believe all stream threads across all app instances will pause
> consuming whilst the rebalance is worked through.
>
> No really. (1) If a thread dies, it takes some time to detect that the
> thread died and thus all other thread continue to process until a
> rebalance is even started. (2) With incremental rebalancing, partitions
> that are not re-assigned are processed throughout a rebalance.
>
>
> > but am I right in thinking that one streams app (or at least some of its
> stream threads) will have to wait for state to be synced from the changelog
> topic?
>
> That is correct. If a thread gets a new task assigned and needs to
> recover state, it would pause processing for all its partitions/tasks
> until restore finished. However, this pausing of partitions/tasks
> happens only on a per-thread basis. Threads, even from the same app
> instance, are totally agnostic to each other.
>
> So what your describe in your example make sense.
>
>
> > If this is the case, do you think increasing standby replicas will
> lessen the issue?
>
> Standbys can reduce your recovery time, and for your low throughput use
> case may even reduce recovery time to zero. Thus, if the rebalance
> itself happens quickly enough, the issue with out-of-order data may go
> away (or at least should be large mitigated).
>
>
> -Matthias
>
>
> On 3/12/21 3:40 AM, Marcus Horsley-Rai wrote:
> > Thanks Matthias - that's great to know.
> >
> >> Increasing the grace period should not really affect throughput, but
> >> latency.
> >
> > Yes, a slip of the tongue on my part, you’re right :-)
> >
> > One last question if I may? I only see issues of out of order data in my
> re-partitioned topic as a result of a rebalance happening.
> > My hypothesis is that when an instance of my streams app dies - the
> consumption of data from the partitions it was responsible for falls behind
> compared to others.
> > I believe all stream threads across all app instances will pause
> consuming whilst the rebalance is worked through.. but am I right in
> thinking that one streams app (or at least some of its stream threads) will
> have to wait for state to be synced from the changelog topic?
> > In other words - when a rebalance happens - I assume the consumer group
> doesn’t wait for the slowest member to be ready to consume?
> >
> > To illustrate with an example:
> >       If I have 3 partitions of a single topic and three streams app
> instances (1 partition each)
> >       I have a producer that produces to each partition each minute on
> the minute
> >       Normally the timestamp of the head record is roughly the same
> across all three partitions. This assumes no lag ever builds up on the
> consumer group, and also assumes data volume and size of messages is
> comparable.
> >
> >       Now I kill streams app A. The rebalance protocol kicks in and
> gives instance B an extra partition to consume from.
> >         Could there now be a bigger lag for one or both of the
> partitions app B is consuming from because it had to sync state store
> state? (Assume B has enough stream processing threads idle and the machine
> is specced to cope with the extra load)
> >        …whereas app C, unhindered by state syncing, has potentially now
> produced to the through topic a record from a newer batch/time window.
> >
> > If this is the case, do you think increasing standby replicas will
> lessen the issue?  I obviously don’t expect it to be a magic bullet, and
> grace period is still required in general
> >
> >
> > Best Regards,
> >
> > Marcus
> >
> >
> >
> >
> > On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax <mj...@apache.org
> <mailto:mj...@apache.org>> wrote:
> >> will it consider a timestamp in the body of the message, if we have
> implemented a custom TimeExtractor?
> >
> > Yes.
> >
> >
> >> Or, which I feel is more likely - does TimeExtractor stream time only
> apply later on once deserialisation has happened?
> >
> > Well, the extractor does apply after deserialization, but we deserialize
> > each partition head-record to be able to apply the timestamp extractor:
> > ie, deserialization happens when a record becomes the "head record".
> >
> > Cf
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
> <
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
> >
> >
> >
> >> the accuracy of the aggregates may have to come second to the
> throughput.
> >
> > Increasing the grace period should not really affect throughput, but
> > latency.
> >
> >
> >
> > -Matthias
> >
> >
> > On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
> >>
> >> Thanks for your reply Matthias, and really great talks :-)
> >>
> >> You’re right that I only have one input topic - though it does have 20
> partitions.
> >> The pointer to max.task.idle.ms <http://max.task.idle.ms/> cleared
> something up for me; I read the following line from Kafka docs but couldn’t
> find what configuration they were referring to.
> >>
> >>>      Within a stream task that may be processing multiple
> topic-partitions, if users configure the application to not wait for all
> partitions to contain some buffered data and pick from the partition with
> the smallest timestamp to process the next record, then later on when some
> records are fetched for other topic-partitions, their timestamps may be
> smaller than those processed records fetched from another topic-partition.
> >>>
> >>
> >> When streams is checking the head record of each partition to pick the
> lowest timestamp - will it consider a timestamp in the body of the message,
> if we have implemented a custom TimeExtractor?
> >> Or, which I feel is more likely - does TimeExtractor stream time only
> apply later on once deserialisation has happened?
> >> The reason I ask is because our producer code doesn’t manually set the
> timestamp in ProducerRecord, only in the JSON body. That may be something
> we can look to change.
> >>
> >> As you say, I fear adjusting grace time may be my only solution;
> however because this is a real-time monitoring application…the accuracy of
> the aggregates may have to come second to the throughput.
> >>
> >> Many thanks,
> >>
> >> Marcus
> >>
> >>
> >> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org <mailto:
> m...@apache.org>> wrote:
> >>> In general, Kafka Streams tries to process messages in timestamp
> order,>
> >>> ie, oldest message first. However, Kafka Streams always need to
> process>
> >>> messages in offset order per partition, and thus, the timestamp>
> >>> synchronization applied to records from different topic (eg, if you
> join>
> >>> two topics).>
> >>>
> >>> There is config `max.task.idle.ms <http://max.task.idle.ms/>` to
> improve timestamp synchronization,>
> >>> but I am not sure if it would help in your case, as it seems you have
> a>
> >>> single input topic.>
> >>>
> >>> It seems, there is already out-of-order data in your input topic.
> Also>
> >>> note that your repartition step, may introduce out-or-order data.>
> >>>
> >>> As you are using a custom Processor, it is up to you to handle>
> >>> out-of-order data, and it seems that you may need to introduce a
> larger>
> >>> grace period. In general, it's very hard (too impossible) to know how>
> >>> much unorder is in a topic, due the decoupled nature of Kafka and>
> >>> interleaved writes of different producers into a topic.>
> >>>
> >>> Not sure if you could change the original partitioning to just use>
> >>> `location-id` to avoid the additional repartitioning step. This could>
> >>> help to reduce unorder.>
> >>>
> >>> For completeness, check out those Kafka Summit talks:>
> >>>  ->
> >>>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> <
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>>
>
> >>>  ->
> >>>
> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
> <
> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>>
>
> >>>
> >>> Hope this helps.>
> >>>
> >>> -Matthias>
> >>>
> >>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:>
> >>>> Hi All,>
> >>>>>
> >>>> Just to give a bit of context; I have an application which is SNMP
> polling>
> >>>> a network. Each collector agent works on a 1 minute schedule,
> polling>
> >>>> device(s) and posting results to a Kafka topic.>
> >>>> The time a given collector publishes data can vary within a minute,
> but it>
> >>>> should never overlap with the next minute time bucket.>
> >>>>>
> >>>> The topic produced to, for arguments sake 'device-results' has
> multiple>
> >>>> partitions. The data is keyed such as 'device-id|location-id'.>
> >>>>>
> >>>> I then had a requirement to aggregate the data by location; every
> device>
> >>>> result within the same location is summed, and an aggregate is output
> each>
> >>>> minute.>
> >>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is
> a>
> >>>> solution to this problem - but we found the throughput was abysmal ->
> >>>> probably due to the I/O performance of our virtual machine
> infrastructure.>
> >>>>>
> >>>> Instead we have hand-rolled something simplistic - which does the job
> 99%>
> >>>> well.>
> >>>>  - We use a through() to re-partition the topic to just location-id>
> >>>>  - We keep an object representing the current minute's aggregate in
> an>
> >>>> in-memory state store (with changelog)>
> >>>>  - When any device result is transformed, and has a timestamp that is
> older>
> >>>> than our current window time - we output the aggregate, otherwise
> update>
> >>>> the running sum.>
> >>>>>
> >>>>  What I have noticed is that when I do a rolling restart of the>
> >>>> application, such as to push new code, data is dropped because of
> messages>
> >>>> processed out of order.>
> >>>> I changed the code to include the equivalent of an extra minute's
> grace>
> >>>> time, but in production I see messages arriving that are > 2min
> behind what>
> >>>> the latest messages are.>
> >>>>>
> >>>> I came across the documentation>
> >>>>
> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering
> <
> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>>
>
> >>>> which alluded to maybe a solution.>
> >>>> Could anyone advise if there is a way in code/configuration
> properties that>
> >>>> I could better guarantee that streams prioritises the *oldest*
> messages>
> >>>> first, rather than caring about offset?>
> >>>>>
> >>>> Thanks in advance for any replies!>
> >>>>>
> >>>> Marcus>
> >>>>>
> >
>

Reply via email to