Rion, What you want isn't really achievable with the APIs you are using. Without some sort of per-key (per-tenant) watermarking -- which Flink doesn't offer -- the watermarks and windows for one tenant can be held up by the failure of another tenant's events to arrive in a timely manner.
However, your pipeline is pretty straightforward, and it shouldn't be particularly difficult to accomplish what you want. What you can do is to ignore the built-in watermarking and windowing APIs, and build equivalent functionality in the form of a KeyedProcessFunction. The Flink docs include an example [1] showing how to implement your own tumbling event time windows with a process function. That implementation assumes you can rely on watermarks for triggering the windows; you'll have to do that differently. What you can do instead is to track, in ValueState, the largest timestamp you've seen so far (for each key/tenant). Whenever that advances, you can subtract the bounded-out-of-orderness duration from that timestamp, and then check to see if the resulting value is now large enough to trigger any of the windows for that key/tenant. Handling allowed lateness should be pretty straightforward. Hope this helps, David [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com> wrote: > Hey folks, I have a somewhat high-level/advice question regarding Flink > and if it has the mechanisms in place to accomplish what I’m trying to do. > I’ve spent a good bit of time using Apache Beam, but recently pivoted over > to native Flink simply because some of the connectors weren’t as mature or > didn’t support some of the functionality that I needed. > > Basically - I have a single Kafka topic with 10 partitions that I’m > consuming from. This is a multi-tenant topic containing data that comes in > at various times from various tenants and is not at all guaranteed to be in > order, at least with regards to “event time”, which is what I care about. > > What I’m trying to accomplish is this: > *Given a multi-tenant topic with records eventually distributed across > partitions, is it possible to consume and window each of these records > independently of one another without one tenant potentially influencing > another and write out to separate files per tenant/source (i.e. some other > defined property on the records)?”* > My pipeline currently looks something like this: > > @JvmStatic > fun main(args: Array<String>) { > val pipeline = StreamExecutionEnvironment > .getExecutionEnvironment() > //.createLocalEnvironmentWithWebUI(Configuration()) > > val properties = buildPropertiesFromArgs(args) > val stream = pipeline > .addSource(readFromKafka("events", properties)) > .assignTimestampsAndWatermarks( > WatermarkStrategy > .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...)) > .withTimestampAssigner { event: Event, _: Long -> > // Assign the created timestamp as the event timestamp > Instant(event.createdTimestamp).millis > } > ) > > // There are multiple data sources that each have their own windows > and allowed lateness > // so ensure that each source only handles records for it > DataSources.forEach { source -> > stream > .filter { event -> > event.source == source.name > } > .keyBy { event -> > //print("Keying record with id ${record.`id$1`} by tenant > ${record.`source$1`.tenantName}") > event.tenant > } > .window( > > TumblingEventTimeWindows.of(Time.minutes(source.windowDuration)) > ) > .allowedLateness( > Time.minutes(source.allowedLateness) > ) > .process( > // This just contains some logic to take the existing > windows and construct a file > // using the window range and keys (tenant/source) with > the values being > // an aggregation of all of the records > WindowedEventProcessFunction(source.name) > ) > .map { summary -> > // This would be a sink to write to a file > } > } > pipeline.execute("event-processor") > } > > My overarching question is really - *Can I properly separate the data > with custom watermark strategies and ensure that keying (or some other > construct) is enough to allow each tenant/source combination to be treated > as it’s own stream with it’s own watermarking? *I know I could possibly > break the single topic up into multiple disparate topics, however that > level of granularity would likely result in several thousand (7000+) topics > so I'm hoping that some of the constructs available within Flink may help > with this (WatermarkStrategies, etc.) > > Any recommendations / advice would be extremely helpful as I'm quite new > to the Flink world, however I have quite a bit of experience in Apache > Beam, Kafka Streams, and a smattering of other streaming technologies. > > Thanks much, > > Rion >