Hey folks, I have a somewhat high-level/advice question regarding Flink and
if it has the mechanisms in place to accomplish what I’m trying to do. I’ve
spent a good bit of time using Apache Beam, but recently pivoted over to
native Flink simply because some of the connectors weren’t as mature or
didn’t support some of the functionality that I needed.

Basically - I have a single Kafka topic with 10 partitions that I’m
consuming from. This is a multi-tenant topic containing data that comes in
at various times from various tenants and is not at all guaranteed to be in
order, at least with regards to “event time”, which is what I care about.

What I’m trying to accomplish is this:
*Given a multi-tenant topic with records eventually distributed across
partitions, is it possible to consume and window each of these records
independently of one another without one tenant potentially influencing
another and write out to separate files per tenant/source (i.e. some other
defined property on the records)?”*
My pipeline currently looks something like this:

@JvmStatic
fun main(args: Array<String>) {
    val pipeline = StreamExecutionEnvironment
        .getExecutionEnvironment()
        //.createLocalEnvironmentWithWebUI(Configuration())

    val properties = buildPropertiesFromArgs(args)
    val stream = pipeline
        .addSource(readFromKafka("events", properties))
        .assignTimestampsAndWatermarks(
            WatermarkStrategy
                .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
                .withTimestampAssigner { event: Event, _: Long ->
                    // Assign the created timestamp as the event timestamp
                    Instant(event.createdTimestamp).millis
                }
        )

    // There are multiple data sources that each have their own windows and
allowed lateness
    // so ensure that each source only handles records for it
    DataSources.forEach { source ->
        stream
            .filter { event ->
                event.source == source.name
            }
            .keyBy { event ->
                //print("Keying record with id ${record.`id$1`} by tenant
${record.`source$1`.tenantName}")
                event.tenant
            }
            .window(

TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
            )
            .allowedLateness(
                Time.minutes(source.allowedLateness)
            )
            .process(
                // This just contains some logic to take the existing
windows and construct a file
                // using the window range and keys (tenant/source) with the
values being
                // an aggregation of all of the records
                WindowedEventProcessFunction(source.name)
            )
            .map { summary ->
                // This would be a sink to write to a file
            }
    }
    pipeline.execute("event-processor")
}

My overarching question is really - *Can I properly separate the data with
custom watermark strategies and ensure that keying (or some other
construct) is enough to allow each tenant/source combination to be treated
as it’s own stream with it’s own watermarking? *I know I could possibly
break the single topic up into multiple disparate topics, however that
level of granularity would likely result in several thousand (7000+) topics
so I'm hoping that some of the constructs available within Flink may help
with this (WatermarkStrategies, etc.)

Any recommendations / advice would be extremely helpful as I'm quite new to
the Flink world, however I have quite a bit of experience in Apache Beam,
Kafka Streams, and a smattering of other streaming technologies.

Thanks much,

Rion

Reply via email to