Hey folks, I have a somewhat high-level/advice question regarding Flink and if it has the mechanisms in place to accomplish what I’m trying to do. I’ve spent a good bit of time using Apache Beam, but recently pivoted over to native Flink simply because some of the connectors weren’t as mature or didn’t support some of the functionality that I needed.
Basically - I have a single Kafka topic with 10 partitions that I’m consuming from. This is a multi-tenant topic containing data that comes in at various times from various tenants and is not at all guaranteed to be in order, at least with regards to “event time”, which is what I care about. What I’m trying to accomplish is this: *Given a multi-tenant topic with records eventually distributed across partitions, is it possible to consume and window each of these records independently of one another without one tenant potentially influencing another and write out to separate files per tenant/source (i.e. some other defined property on the records)?”* My pipeline currently looks something like this: @JvmStatic fun main(args: Array<String>) { val pipeline = StreamExecutionEnvironment .getExecutionEnvironment() //.createLocalEnvironmentWithWebUI(Configuration()) val properties = buildPropertiesFromArgs(args) val stream = pipeline .addSource(readFromKafka("events", properties)) .assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...)) .withTimestampAssigner { event: Event, _: Long -> // Assign the created timestamp as the event timestamp Instant(event.createdTimestamp).millis } ) // There are multiple data sources that each have their own windows and allowed lateness // so ensure that each source only handles records for it DataSources.forEach { source -> stream .filter { event -> event.source == source.name } .keyBy { event -> //print("Keying record with id ${record.`id$1`} by tenant ${record.`source$1`.tenantName}") event.tenant } .window( TumblingEventTimeWindows.of(Time.minutes(source.windowDuration)) ) .allowedLateness( Time.minutes(source.allowedLateness) ) .process( // This just contains some logic to take the existing windows and construct a file // using the window range and keys (tenant/source) with the values being // an aggregation of all of the records WindowedEventProcessFunction(source.name) ) .map { summary -> // This would be a sink to write to a file } } pipeline.execute("event-processor") } My overarching question is really - *Can I properly separate the data with custom watermark strategies and ensure that keying (or some other construct) is enough to allow each tenant/source combination to be treated as it’s own stream with it’s own watermarking? *I know I could possibly break the single topic up into multiple disparate topics, however that level of granularity would likely result in several thousand (7000+) topics so I'm hoping that some of the constructs available within Flink may help with this (WatermarkStrategies, etc.) Any recommendations / advice would be extremely helpful as I'm quite new to the Flink world, however I have quite a bit of experience in Apache Beam, Kafka Streams, and a smattering of other streaming technologies. Thanks much, Rion