Rion,

What you want isn't really achievable with the APIs you are using. Without
some sort of per-key (per-tenant) watermarking -- which Flink doesn't offer
-- the watermarks and windows for one tenant can be held up by the failure
of another tenant's events to arrive in a timely manner.

However, your pipeline is pretty straightforward, and it shouldn't be
particularly difficult to accomplish what you want. What you can do is to
ignore the built-in watermarking and windowing APIs, and build equivalent
functionality in the form of a KeyedProcessFunction.

The Flink docs include an example [1] showing how to implement your own
tumbling event time windows with a process function. That implementation
assumes you can rely on watermarks for triggering the windows; you'll have
to do that differently.

What you can do instead is to track, in ValueState, the largest timestamp
you've seen so far (for each key/tenant). Whenever that advances, you can
subtract the bounded-out-of-orderness duration from that timestamp, and
then check to see if the resulting value is now large enough to trigger any
of the windows for that key/tenant.

Handling allowed lateness should be pretty straightforward.

Hope this helps,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example

On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hey folks, I have a somewhat high-level/advice question regarding Flink
> and if it has the mechanisms in place to accomplish what I’m trying to do.
> I’ve spent a good bit of time using Apache Beam, but recently pivoted over
> to native Flink simply because some of the connectors weren’t as mature or
> didn’t support some of the functionality that I needed.
>
> Basically - I have a single Kafka topic with 10 partitions that I’m
> consuming from. This is a multi-tenant topic containing data that comes in
> at various times from various tenants and is not at all guaranteed to be in
> order, at least with regards to “event time”, which is what I care about.
>
> What I’m trying to accomplish is this:
> *Given a multi-tenant topic with records eventually distributed across
> partitions, is it possible to consume and window each of these records
> independently of one another without one tenant potentially influencing
> another and write out to separate files per tenant/source (i.e. some other
> defined property on the records)?”*
> My pipeline currently looks something like this:
>
> @JvmStatic
> fun main(args: Array<String>) {
>     val pipeline = StreamExecutionEnvironment
>         .getExecutionEnvironment()
>         //.createLocalEnvironmentWithWebUI(Configuration())
>
>     val properties = buildPropertiesFromArgs(args)
>     val stream = pipeline
>         .addSource(readFromKafka("events", properties))
>         .assignTimestampsAndWatermarks(
>             WatermarkStrategy
>                 .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>                 .withTimestampAssigner { event: Event, _: Long ->
>                     // Assign the created timestamp as the event timestamp
>                     Instant(event.createdTimestamp).millis
>                 }
>         )
>
>     // There are multiple data sources that each have their own windows
> and allowed lateness
>     // so ensure that each source only handles records for it
>     DataSources.forEach { source ->
>         stream
>             .filter { event ->
>                 event.source == source.name
>             }
>             .keyBy { event ->
>                 //print("Keying record with id ${record.`id$1`} by tenant
> ${record.`source$1`.tenantName}")
>                 event.tenant
>             }
>             .window(
>
> TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>             )
>             .allowedLateness(
>                 Time.minutes(source.allowedLateness)
>             )
>             .process(
>                 // This just contains some logic to take the existing
> windows and construct a file
>                 // using the window range and keys (tenant/source) with
> the values being
>                 // an aggregation of all of the records
>                 WindowedEventProcessFunction(source.name)
>             )
>             .map { summary ->
>                 // This would be a sink to write to a file
>             }
>     }
>     pipeline.execute("event-processor")
> }
>
> My overarching question is really - *Can I properly separate the data
> with custom watermark strategies and ensure that keying (or some other
> construct) is enough to allow each tenant/source combination to be treated
> as it’s own stream with it’s own watermarking? *I know I could possibly
> break the single topic up into multiple disparate topics, however that
> level of granularity would likely result in several thousand (7000+) topics
> so I'm hoping that some of the constructs available within Flink may help
> with this (WatermarkStrategies, etc.)
>
> Any recommendations / advice would be extremely helpful as I'm quite new
> to the Flink world, however I have quite a bit of experience in Apache
> Beam, Kafka Streams, and a smattering of other streaming technologies.
>
> Thanks much,
>
> Rion
>

Reply via email to