Thanks for this suggestion David, it's extremely helpful. Since this will vary depending on the elements retrieved from a separate stream, I'm guessing something like the following would be roughly the avenue to continue down:
fun main(args: Array<String>) { val parameters = mergeParametersFromProperties(args) val stream = StreamExecutionEnvironment.getExecutionEnvironment() // Get the stream for tenant-specific Elastic configurations val connectionStream = stream .fromSource( KafkaSource.of(parameters, listOf("elastic-configs")), WatermarkStrategy.noWatermarks(), "elastic-configs" ) // Get the stream of incoming messages to be routed to Elastic stream .fromSource( KafkaSource.of(parameters, listOf("messages")), WatermarkStrategy.noWatermarks(), "messages" ) .keyBy { message -> // Key by the tenant in the message message.getTenant() } .connect( // Connect the messages stream with the configurations connectionStream ) .process(object : KeyedCoProcessFunction<String, String, String, String>() { // For this key, we need to store all of the previous messages in state // in the case where we don't have a given mapping for this tenant yet lateinit var messagesAwaitingConfigState: ListState<String> lateinit var configState: ValueState<String> override fun open(parameters: Configuration) { super.open(parameters) // Initialize the states messagesAwaitingConfigState = runtimeContext.getListState(awaitingStateDesc) configState = runtimeContext.getState(configStateDesc) } // When an element is received override fun processElement1(message: String, context: Context, out: Collector<String>) { // Check if we have a mapping if (configState.value() == null){ // We don't have a mapping for this tenant, store messages until we get it messagesAwaitingConfigState.add(message) } else { // Output the record with some indicator of the route? out.collect(message) } } override fun processElement2(config: String, context: Context, out: Collector<String>) { // If this mapping is for this specific tenant, store it and flush the pending // records in state if (config.getTenant() == context.currentKey){ configState.update(config) val messagesToFlush = messagesAwaitingConfigState.get() messagesToFlush.forEach { message -> out.collect(message) } } } // State descriptors val awaitingStateDesc = ListStateDescriptor( "messages-awaiting-config", TypeInformation.of(String::class.java) ) val configStateDesc = ValueStateDescriptor( "elastic-config", TypeInformation.of(String::class.java) ) }) stream.executeAsync("$APPLICATION_NAME-job") } Basically, connect my tenant-specific configuration stream with my incoming messages (keyed by tenant) and buffer them until I have a corresponding configuration (to avoid race-conditions). However, I'm guessing what will happen here is rather than directly outputting the messages from this process function, I'd construct some type of wrapper here with the necessary routing/configuration for the message (obtained via the configuration stream) along with the element, which might be something like a MessageWrapper<ElementT, ConfigurationT> and pass those elements to the sink, which would create the tenant-specific Elastic connection from the ConfigurationT element and handle caching it and then just grab the element and send it on it's way? Those are really the only bits I'm stuck on at the moment: 1. The shape of the elements being evicted from the process function (Is a simple wrapper with the configuration for the sink enough here? Do I need to explicitly initialize the sink within this function? Etc.) 2. The actual use of the DynamicElasticsearchSink class (Would it just be something like an .addSink(DynamicElasticSearchSink<String, Configuration>()) or perhaps something else entirely?) Thanks again so much for the advice thus far David, it's greatly appreciated. Rion On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote: > To give you a better idea, in high-level I think could look something like > this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1]. > > [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 > > On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Hi David, >> >> Thanks for your response! I think there are currently quite a few >> unknowns in my end in terms of what a production loads look like but I >> think the number of clusters shouldn’t be too large (and will either rarely >> change or have new entries come in at runtime, but it needs to support >> that). >> >> I think the dynamic approach might be a good route to explore with actual >> changes to the Elasticsearch sink as a longer term option. I’m not sure >> what the dynamic one would look like at the moment though, perhaps that’s >> something you’d be able to advise on? >> >> Given that all the records are keyed for a given tenant and I would have >> the mappings stored in state, is it possible that within the open() >> function for this dynamic route to access the state and initialize the >> client there? Or maybe there’s some other approach (such as grouping by >> clusters and dynamically handling indices)? >> >> I’d be happy to give a shot at making the appropriate changes to the sink >> as well, although I’m far from an Elastic expert. If you point me in the >> right direction, I may be able to help out. >> >> Thanks much! >> >> Rion >> >> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >> >> >> Hi Rion, >> >> As you probably already know, for dynamic indices, you can simply >> implement your own ElasticsearchSinkFunction >> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >> [1], where you can create any request that elastic client supports. >> >> The tricky part is how to implement dynamic routing into multiple >> clusters. >> - If the elastic clusters are known upfront (before submitting job), you >> can easily create multiple elastic sinks and prepend them with a simple >> filter (this is basically what split operator does). >> - If you discover elastics clusters at runtime, this would require some >> changes of the current ElasticsearchSink implementation. I think this may >> be actually as simple as introducing something like >> DynamicElasticsearchSink, that could dynamically create and managed "child" >> sinks. This would probably require some thoughts about how to manage >> consumed resources (memory), because number of child sink could be >> potentially unbounded. This could be of course simplified if underlying >> elastic client already supports that, which I'm not aware of. If you'd like >> to take this path, it would definitely be a great contribution to Flink >> (I'm able to provide some guidance). >> >> [1] >> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >> >> Best, >> D. >> >> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> I have a use-case that I wanted to initially pose to the mailing list as >>> I’m not terribly familiar with the Elasticsearch connector to ensure I’m >>> not going down the wrong path trying to accomplish this in Flink (or if >>> something downstream might be a better option). >>> >>> Basically, I have the following pieces to the puzzle: >>> >>> - A stream of tenant-specific events >>> - An HTTP endpoint containing mappings for tenant-specific Elastic >>> cluster information (as each tenant has its own specific Elastic >>> cluster/index) >>> >>> What I’m hoping to accomplish is the following: >>> >>> 1. One stream will periodically poll the HTTP endpoint and store >>> these cluster mappings in state (keyed by tenant with cluster info as the >>> value) >>> 2. The event stream will be keyed by tenant and connected to the >>> cluster mappings stream. >>> 3. I’ll need to an Elasticsearch sink that can route the >>> tenant-specific event data to its corresponding cluster/index from the >>> mapping source. >>> >>> I know that the existing Elasticsearch sink supports dynamic indices, >>> however I didn’t know if it’s possible to adjust the cluster like I would >>> need on a per-tenant basis or if there’s a better approach here? >>> >>> Any advice would be appreciated. >>> >>> Thanks, >>> >>> Rion >>> >>