Thanks David, I figured that the correct approach would obviously be to adopt a keying strategy upstream to ensure the same data that I used as a key downstream fell on the same partition (ensuring the ordering guarantees I’m looking for).
I’m guessing implementation-wise, when I would normally evict a window after some event time and allowed lateness, I could set a timer or just explicitly keep the window open for some additional time to allow for out of order data to make its way into the window. Either way - I think the keying is probably the right approach, but I wanted to consider any other options should that become an issue upstream. Thanks! Rion > On Feb 27, 2021, at 10:21 AM, David Anderson <dander...@apache.org> wrote: > > > Rion, > > If you can arrange for each tenant's events to be in only one kafka > partition, that should be the best way to simplify the processing you need to > do. Otherwise, a simple change that may help would be to increase the bounded > delay you use in calculating your own per-tenant watermarks, thereby making > late events less likely. > > David > >> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <rionmons...@gmail.com> wrote: >> David and Timo, >> >> Firstly, thank you both so much for your contributions and advice. I believe >> I’ve implemented things along the lines that you both detailed and things >> appear to work just as expected (e.g. I can see things arriving, being added >> to windows, discarding late records, and ultimately writing out files as >> expected). >> >> With that said, I have one question / issue that I’ve run into with handling >> the data coming my Kafka topic. Currently, my tenant/source (i.e. my key) >> may be distributed across the 10 partitions of my Kafka topic. With the way >> that I’m consuming from this topic (with a Kafka Consumer), it looks like my >> data is arriving in a mixed order which seems to be causing my own >> watermarks (those stored in my ValueState) to process as later data may >> arrive earlier than other data and cause my windows to be evicted. >> >> I’m currently using the `withNoWatermarks()` along with a custom timestamp >> assigned to handle all of my timestamping, but is there a mechanism to >> handle the mixed ordering across partitions in this scenario at the Flink >> level? >> >> I know the answer here likely lies with Kafka and adopting a better keying >> strategy to ensure the same tenant/source (my key) lands on the same >> partition, which by definition ensures ordering. I’m just wondering if >> there’s some mechanism to accomplish this post-reading from Kafka in Flink >> within my pipeline to handle things in a similar fashion? >> >> Again - thank you both so much, I’m loving the granularity and control that >> Flink has been providing me over other streaming technologies I’ve used in >> the past. I’m totally sold on it and am looking forward to doing more >> incredible things with it. >> >> Best regards, >> >> Rion >> >>>> On Feb 26, 2021, at 4:36 AM, David Anderson <dander...@apache.org> wrote: >>>> >>> >>> Yes indeed, Timo is correct -- I am proposing that you not use timers at >>> all. Watermarks and event-time timers go hand in hand -- and neither >>> mechanism can satisfy your requirements. >>> >>> You can instead put all of the timing logic in the processElement method -- >>> effectively emulating what you would get if Flink were to offer per-key >>> watermarking. >>> >>> The reason that the PseudoWindow example is using MapState is that for each >>> key/tenant, more than one window can be active simultaneously. This occurs >>> because the event stream is out-of-order with respect to time, so events >>> for the "next window" are probably being processed before "the previous" >>> window is complete. And if you want to accommodate allowed lateness, the >>> requirement to have several windows open at once becomes even more >>> important. >>> >>> MapState gives you a per-tenant hashmap, where each entry in that map >>> corresponds to an open window for some particular tenant, where the map's >>> key is the timestamp for a window, and the value is whatever state you want >>> that window to hold. >>> >>> Best regards, >>> David >>> >>> >>> >>> >>>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <twal...@apache.org> wrote: >>>> Hi Rion, >>>> >>>> I think what David was refering to is that you do the entire time >>>> handling yourself in process function. That means not using the >>>> `context.timerService()` or `onTimer()` that Flink provides but calling >>>> your own logic based on the timestamps that enter your process function >>>> and the stored state. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> On 26.02.21 00:29, Rion Williams wrote: >>>> > >>>> > Hi David, >>>> > >>>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow >>>> > example is excellent. I believe it closely aligns with an approach that >>>> > I was tinkering with but seemed to be missing a few key pieces. In my >>>> > case, I'm essentially going to want to be aggregating the messages that >>>> > are coming into the window (a simple string-concatenation aggregation >>>> > would work). Would I need another form of state to hold that, as looking >>>> > through this example with naive eyes, it seems that this function is >>>> > currently storing multiple windows in state via the MapState provided: >>>> > >>>> > // Keyed, managed state, with an entry for each window, keyed by the >>>> > window's end time. >>>> > // There is a separate MapState object for each driver. >>>> > private transient MapState<Long, Float> sumOfTips; >>>> > >>>> > If I wanted to perform an aggregation for each key/tenant, would a >>>> > MapState be appropriate? Such as a MapState<Long, String> if I was doing >>>> > a string aggregation, so that within my processElement function I could >>>> > use something similar for building these aggregations and ultimately >>>> > triggering them: >>>> > >>>> > // Keep track of a tenant/source specific watermark >>>> > private lateinit var currentWatermark: ValueState<Long> >>>> > // Keep track of the contents of each of the windows where the key >>>> > represents the close >>>> > // of the window and the contents represents an accumulation of the >>>> > records for that window >>>> > private lateinit var windowContents: MapState<Long, String> >>>> > >>>> > If that's the case, this is what I've thrown together thus far and I >>>> > feel like it's moving in the right direction: >>>> > >>>> > class MagicWindow(private val duration: Long, private val lateness: >>>> > Long): >>>> > KeyedProcessFunction<String, Event, FileOutput>(){ >>>> > >>>> > // Keep track of a tenant/source specific watermark >>>> > private lateinit var currentWatermark: ValueState<Long> >>>> > // Keep track of the contents of each of the windows where the key >>>> > represents the close >>>> > // of the window and the contents represents an accumulation of the >>>> > records for that window >>>> > private lateinit var windowContents: MapState<Long, String> >>>> > >>>> > override fun open(config: Configuration) { >>>> > currentWatermark = runtimeContext.getState(watermark) >>>> > currentWatermark.update(Long.MIN_VALUE) >>>> > } >>>> > >>>> > override fun processElement(element: Event, context: Context, out: >>>> > Collector<FileOutput>) { >>>> > // Resolve the event time >>>> > val eventTime: Long = getEventTime(element) >>>> > >>>> > // Update watermark (if applicable) >>>> > if (currentWatermark.value() < eventTime){ >>>> > currentWatermark.update(eventTime) >>>> > } >>>> > >>>> > // Define a timer for this window >>>> > val timerService = context.timerService() >>>> > >>>> > if (eventTime <= timerService.currentWatermark()) { >>>> > // This event is late; its window has already been >>>> > triggered. >>>> > } else { >>>> > // Determine the "actual" window closure and start a timer >>>> > for it >>>> > // (eventTime + window >>>> > val endOfWindow= eventTime - (eventTime % duration) + >>>> > duration - 1 >>>> > >>>> > // Schedule a callback for when the window has been >>>> > completed. >>>> > timerService.registerEventTimeTimer(endOfWindow) >>>> > >>>> > // Add this element to the corresponding aggregation for >>>> > this window >>>> > windowContents.put(endOfWindow, windowContents[endOfWindow] >>>> > + "$element") >>>> > } >>>> > } >>>> > >>>> > override fun onTimer(timestamp: Long, context: OnTimerContext, out: >>>> > Collector<FileOutput>) { >>>> > val key = context.currentKey >>>> > val currentAggregation: String = windowContents.get(timestamp) >>>> > >>>> > // Output things here and clear the current aggregation for this >>>> > // tenant/source combination in this window >>>> > } >>>> > >>>> > companion object { >>>> > private val watermark = ValueStateDescriptor( >>>> > "watermark", >>>> > Long::class.java >>>> > ) >>>> > >>>> > private val windowContents = MapStateDescriptor( >>>> > "window-contents", >>>> > Long::class.java, >>>> > String::class.java >>>> > ) >>>> > >>>> > fun getEventTime(element: Event): Long { >>>> > return Instant(element.`source$1`.createdTimestamp).millis >>>> > } >>>> > } >>>> > } >>>> > >>>> > Is something glaringly off with this? I’ll need to do some additionally >>>> > reading on the timers, but any additional clarification would be greatly >>>> > appreciated. >>>> > >>>> > Thanks so much for your initial response again! >>>> > >>>> > Rion >>>> > >>>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org> >>>> >> wrote: >>>> >> >>>> >> >>>> >> Rion, >>>> >> >>>> >> What you want isn't really achievable with the APIs you are using. >>>> >> Without some sort of per-key (per-tenant) watermarking -- which Flink >>>> >> doesn't offer -- the watermarks and windows for one tenant can be held >>>> >> up by the failure of another tenant's events to arrive in a timely >>>> >> manner. >>>> >> >>>> >> However, your pipeline is pretty straightforward, and it shouldn't be >>>> >> particularly difficult to accomplish what you want. What you can do is >>>> >> to ignore the built-in watermarking and windowing APIs, and build >>>> >> equivalent functionality in the form of a KeyedProcessFunction. >>>> >> >>>> >> The Flink docs include an example [1] showing how to implement your >>>> >> own tumbling event time windows with a process function. That >>>> >> implementation assumes you can rely on watermarks for triggering the >>>> >> windows; you'll have to do that differently. >>>> >> >>>> >> What you can do instead is to track, in ValueState, the largest >>>> >> timestamp you've seen so far (for each key/tenant). Whenever that >>>> >> advances, you can subtract the bounded-out-of-orderness duration from >>>> >> that timestamp, and then check to see if the resulting value is now >>>> >> large enough to trigger any of the windows for that key/tenant. >>>> >> >>>> >> Handling allowed lateness should be pretty straightforward. >>>> >> >>>> >> Hope this helps, >>>> >> David >>>> >> >>>> >> [1] >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example >>>> >> >>>> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example> >>>> >> >>>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com >>>> >> <mailto:rionmons...@gmail.com>> wrote: >>>> >> >>>> >> Hey folks, I have a somewhat high-level/advice question regarding >>>> >> Flink and if it has the mechanisms in place to accomplish what I’m >>>> >> trying to do. I’ve spent a good bit of time using Apache Beam, but >>>> >> recently pivoted over to native Flink simply because some of the >>>> >> connectors weren’t as mature or didn’t support some of the >>>> >> functionality that I needed. >>>> >> >>>> >> Basically - I have a single Kafka topic with 10 partitions that >>>> >> I’m consuming from. This is a multi-tenant topic containing data >>>> >> that comes in at various times from various tenants and is not at >>>> >> all guaranteed to be in order, at least with regards to “event >>>> >> time”, which is what I care about. >>>> >> >>>> >> What I’m trying to accomplish is this: *Given a multi-tenant topic >>>> >> with records eventually distributed across partitions, is it >>>> >> possible to consume and window each of these records independently >>>> >> of one another without one tenant potentially influencing another >>>> >> and write out to separate files per tenant/source (i.e. some other >>>> >> defined property on the records)?” >>>> >> * >>>> >> My pipeline currently looks something like this: >>>> >> >>>> >> @JvmStatic >>>> >> fun main(args: Array<String>) { >>>> >> val pipeline = StreamExecutionEnvironment >>>> >> .getExecutionEnvironment() >>>> >> //.createLocalEnvironmentWithWebUI(Configuration()) >>>> >> >>>> >> val properties = buildPropertiesFromArgs(args) >>>> >> val stream = pipeline >>>> >> .addSource(readFromKafka("events", properties)) >>>> >> .assignTimestampsAndWatermarks( >>>> >> WatermarkStrategy >>>> >> >>>> >> .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...)) >>>> >> .withTimestampAssigner { event: Event, _: Long -> >>>> >> // Assign the created timestamp as the event >>>> >> timestamp >>>> >> Instant(event.createdTimestamp).millis >>>> >> } >>>> >> ) >>>> >> >>>> >> // There are multiple data sources that each have their own >>>> >> windows and allowed lateness >>>> >> // so ensure that each source only handles records for it >>>> >> DataSources.forEach { source -> >>>> >> stream >>>> >> .filter { event -> >>>> >> event.source == source.name <http://source.name> >>>> >> } >>>> >> .keyBy { event -> >>>> >> //print("Keying record with id ${record.`id$1`} by >>>> >> tenant ${record.`source$1`.tenantName}") >>>> >> event.tenant >>>> >> } >>>> >> .window( >>>> >> >>>> >> TumblingEventTimeWindows.of(Time.minutes(source.windowDuration)) >>>> >> ) >>>> >> .allowedLateness( >>>> >> Time.minutes(source.allowedLateness) >>>> >> ) >>>> >> .process( >>>> >> // This just contains some logic to take the >>>> >> existing windows and construct a file >>>> >> // using the window range and keys (tenant/source) >>>> >> with the values being >>>> >> // an aggregation of all of the records >>>> >> WindowedEventProcessFunction(source.name >>>> >> <http://source.name>) >>>> >> ) >>>> >> .map { summary -> >>>> >> // This would be a sink to write to a file >>>> >> } >>>> >> } >>>> >> pipeline.execute("event-processor") >>>> >> } >>>> >> >>>> >> My overarching question is really - *Can I properly separate the >>>> >> data with custom watermark strategies and ensure that keying (or >>>> >> some other construct) is enough to allow each tenant/source >>>> >> combination to be treated as it’s own stream with it’s own >>>> >> watermarking? *I know I could possibly break the single topic up >>>> >> into multiple disparate topics, however that level of granularity >>>> >> would likely result in several thousand (7000+) topics so I'm >>>> >> hoping that some of the constructs available within Flink may help >>>> >> with this (WatermarkStrategies, etc.) >>>> >> >>>> >> Any recommendations / advice would be extremely helpful as I'm >>>> >> quite new to the Flink world, however I have quite a bit of >>>> >> experience in Apache Beam, Kafka Streams, and a smattering of >>>> >> other streaming technologies. >>>> >> >>>> >> Thanks much, >>>> >> >>>> >> Rion >>>> >> >>>>