Hi David, Thanks again for the response, I believe that I'm getting pretty close for at least a POC-level implementation of this. Currently, I'm working with JsonObject instances throughout the pipeline, so I wanted to try this out and simply stored the routing information within the element itself for simplicity's sake right now, so it has a shape that looks something like this:
{ "route": { "hosts": "...", "index": "...", ... }, "all-other-fields-here" } And I've stripped back several of the layers of the routers (since I already have all of the information in the element at that point). I tried using something like this: class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), CheckpointedFunction { private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> = ConcurrentHashMap() private lateinit var configuration: Configuration override fun open(parameters: Configuration) { configuration = parameters } override fun invoke(element: JsonObject, context: SinkFunction.Context) { val route = getHost(element) // Check if we already have a router for this cluster var sink = sinkRoutes[route] if (sink == null) { // If not, create one sink = buildSinkFromRoute(element) sink.open(configuration) sinkRoutes[route] = sink } sink.invoke(element, context) } override fun initializeState(context: FunctionInitializationContext) { // No-op. } override fun snapshotState(context: FunctionSnapshotContext) { // This is used only to flush pending writes. for (sink in sinkRoutes.values) { sink.snapshotState(context) } } override fun close() { for (sink in sinkRoutes.values) { sink.close() } } private fun buildSinkFromRoute(element: JsonObject, ho): ElasticsearchSink<JsonObject> { val builder = ElasticsearchSink.Builder<JsonObject>( buildHostsFromElement(element), ElasticsearchRoutingFunction() ) builder.setBulkFlushMaxActions(1) // TODO: Configure authorization if available // builder.setRestClientFactory { restClient -> // restClient.setHttpClientConfigCallback(object : RestClientBuilder.HttpClientConfigCallback { // override fun customizeHttpClient(builder: HttpAsyncClientBuilder): HttpAsyncClientBuilder { // // Configure authorization here // val credentialsProvider = BasicCredentialsProvider().apply { // setCredentials( // AuthScope.ANY, // UsernamePasswordCredentials("$USERNAME", "$PASSWORD") // ) // } // // return builder.setDefaultCredentialsProvider(credentialsProvider); // } // }) // } return builder.build() } private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{ val transportAddresses = element .get("route").asJsonObject .get("hosts").asString // If there are multiple, they should be comma-delimited val addresses = transportAddresses.split(",") return addresses .filter { address -> address.isNotEmpty() } .map { address -> HttpHost.create(address) } } private fun getHost(element: JsonObject): String { return element .get("route").asJsonObject .get("hosts").asString } private class ElasticsearchRoutingFunction: ElasticsearchSinkFunction<JsonObject> { override fun process(element: JsonObject, context: RuntimeContext, indexer: RequestIndexer) { indexer.add(request(element)) } private fun request(element: JsonObject): IndexRequest { // Access routing information val index = element .get("route").asJsonObject .get("index").asString // Strip off routing information element.remove("route") // Send the request return Requests.indexRequest() .index(index) .type("_doc") .source(mapOf( "data" to "$element" )) } } } After running an integration test, I keep encountering running into the following error during the invocation of the child sink: // The runtime context has not been initialized. sink.invoke(element, context) I can see the underlying sink getting initialized, the open call being made, etc. however for some reason it looks like there's an issue related to the context during the invoke call namely* "The runtime context has not been initialized". *I had assumed this would be alright since the context for the "wrapper" should have already been initialized, but maybe there's something that I'm missing. Also, please forgive any hastily written or nasty code as this is purely a POC to see if I could get this to work using a single object. I have the hopes of cleaning it up and genericizing it after I am confident that it actually works. Thanks so much again, Rion On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote: > Hi Rion, > > Sorry for late reply, I've missed your previous message. Thanks Arvid for > the reminder <3. > > something like a MessageWrapper<ElementT, ConfigurationT> and pass those >> elements to the sink, which would create the tenant-specific Elastic >> connection from the ConfigurationT element and handle caching it and >> then just grab the element and send it on it's way? > > > Yes, this is exactly what I had in mind. There should be almost no > overhead as sink can be easily chained with your join > (KeyedCoProcessFunction) function. > > - > - >> >> The shape of the elements being evicted from the process function (Is >> a simple wrapper with the configuration for the sink enough here? Do I >> need >> to explicitly initialize the sink within this function? Etc.) > > - > - To write an element you need a configuration for the destination and > the element itself, so a tuple of *(ElasticConfiguration, Element)* > should be enough (that's basically your MessageWrapper<ElementT, > ConfigurationT> class). > > > - > - >> >> The actual use of the *DynamicElasticsearchSink* class (Would it just >> be something like an *.addSink(**DynamicElasticSearchSink<**String, >> Configuration>())* or perhaps something else entirely?) > > - > > I guess it could look something like the snippet below. It would be > definitely good to play around with the *DynamicElasticSearchSink* API > and make it more meaningful / user friendly (the gist I've shared was just > a very rough prototype to showcase the idea). > > > - static class Destination { > > private final List<HttpHost> httpHosts; > > Destination(List<HttpHost> httpHosts) { > this.httpHosts = httpHosts; > } > } > - > - final DataStream<Tuple2<Destination, String>> toWrite = ...; > toWrite.addSink( > new DynamicElasticsearchSink<>( > new SinkRouter< > Tuple2<Destination, String>, > String, > ElasticsearchSink<Tuple2<Destination, > String>>>() { > > @Override > public String getRoute(Tuple2<Destination, String> > element) { > - // Construct a deterministic unique caching > key for the destination... (this could be cheaper if you know the data) > return element.f0.httpHosts.stream() > .map(HttpHost::toHostString) > .collect(Collectors.joining(",")); > } > > @Override > public ElasticsearchSink<Tuple2<Destination, > String>> createSink( > String cacheKey, Tuple2<Destination, > String> element) { > return new ElasticsearchSink.Builder<>( > element.f0.httpHosts, > (ElasticsearchSinkFunction< > > Tuple2<Destination, String>>) > (el, ctx, indexer) -> { > // Construct index > request. > final IndexRequest > request = ...; > > indexer.add(request); > }) > .build(); > } > })); > > > I hope this helps ;) > > Best, > D. > > > On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Thanks for this suggestion David, it's extremely helpful. >> >> Since this will vary depending on the elements retrieved from a separate >> stream, I'm guessing something like the following would be roughly the >> avenue to continue down: >> >> fun main(args: Array<String>) { >> val parameters = mergeParametersFromProperties(args) >> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >> >> // Get the stream for tenant-specific Elastic configurations >> val connectionStream = stream >> .fromSource( >> KafkaSource.of(parameters, listOf("elastic-configs")), >> WatermarkStrategy.noWatermarks(), >> "elastic-configs" >> ) >> >> // Get the stream of incoming messages to be routed to Elastic >> stream >> .fromSource( >> KafkaSource.of(parameters, listOf("messages")), >> WatermarkStrategy.noWatermarks(), >> "messages" >> ) >> .keyBy { message -> >> // Key by the tenant in the message >> message.getTenant() >> } >> .connect( >> // Connect the messages stream with the configurations >> connectionStream >> ) >> .process(object : KeyedCoProcessFunction<String, String, String, >> String>() { >> // For this key, we need to store all of the previous messages >> in state >> // in the case where we don't have a given mapping for this >> tenant yet >> lateinit var messagesAwaitingConfigState: ListState<String> >> lateinit var configState: ValueState<String> >> >> override fun open(parameters: Configuration) { >> super.open(parameters) >> // Initialize the states >> messagesAwaitingConfigState = >> runtimeContext.getListState(awaitingStateDesc) >> configState = runtimeContext.getState(configStateDesc) >> } >> >> // When an element is received >> override fun processElement1(message: String, context: Context, >> out: Collector<String>) { >> // Check if we have a mapping >> if (configState.value() == null){ >> // We don't have a mapping for this tenant, store >> messages until we get it >> messagesAwaitingConfigState.add(message) >> } >> else { >> // Output the record with some indicator of the route? >> out.collect(message) >> } >> } >> >> override fun processElement2(config: String, context: Context, >> out: Collector<String>) { >> // If this mapping is for this specific tenant, store it and >> flush the pending >> // records in state >> if (config.getTenant() == context.currentKey){ >> configState.update(config) >> val messagesToFlush = messagesAwaitingConfigState.get() >> messagesToFlush.forEach { message -> >> out.collect(message) >> } >> } >> } >> >> // State descriptors >> val awaitingStateDesc = ListStateDescriptor( >> "messages-awaiting-config", >> TypeInformation.of(String::class.java) >> ) >> >> val configStateDesc = ValueStateDescriptor( >> "elastic-config", >> TypeInformation.of(String::class.java) >> ) >> }) >> >> stream.executeAsync("$APPLICATION_NAME-job") >> } >> >> Basically, connect my tenant-specific configuration stream with my >> incoming messages (keyed by tenant) and buffer them until I have a >> corresponding configuration (to avoid race-conditions). However, I'm >> guessing what will happen here is rather than directly outputting the >> messages from this process function, I'd construct some type of wrapper >> here with the necessary routing/configuration for the message (obtained via >> the configuration stream) along with the element, which might be something >> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements >> to the sink, which would create the tenant-specific Elastic connection from >> the ConfigurationT element and handle caching it and then just grab the >> element and send it on it's way? >> >> Those are really the only bits I'm stuck on at the moment: >> >> 1. The shape of the elements being evicted from the process function >> (Is a simple wrapper with the configuration for the sink enough here? Do I >> need to explicitly initialize the sink within this function? Etc.) >> 2. The actual use of the DynamicElasticsearchSink class (Would it >> just be something like an .addSink(DynamicElasticSearchSink<String, >> Configuration>()) or perhaps something else entirely?) >> >> Thanks again so much for the advice thus far David, it's greatly >> appreciated. >> >> Rion >> >> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote: >> >>> To give you a better idea, in high-level I think could look something >>> like this >>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1]. >>> >>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>> >>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> >>> wrote: >>> >>>> Hi David, >>>> >>>> Thanks for your response! I think there are currently quite a few >>>> unknowns in my end in terms of what a production loads look like but I >>>> think the number of clusters shouldn’t be too large (and will either rarely >>>> change or have new entries come in at runtime, but it needs to support >>>> that). >>>> >>>> I think the dynamic approach might be a good route to explore with >>>> actual changes to the Elasticsearch sink as a longer term option. I’m not >>>> sure what the dynamic one would look like at the moment though, perhaps >>>> that’s something you’d be able to advise on? >>>> >>>> Given that all the records are keyed for a given tenant and I would >>>> have the mappings stored in state, is it possible that within the open() >>>> function for this dynamic route to access the state and initialize the >>>> client there? Or maybe there’s some other approach (such as grouping by >>>> clusters and dynamically handling indices)? >>>> >>>> I’d be happy to give a shot at making the appropriate changes to the >>>> sink as well, although I’m far from an Elastic expert. If you point me in >>>> the right direction, I may be able to help out. >>>> >>>> Thanks much! >>>> >>>> Rion >>>> >>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >>>> >>>> >>>> Hi Rion, >>>> >>>> As you probably already know, for dynamic indices, you can simply >>>> implement your own ElasticsearchSinkFunction >>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>> [1], where you can create any request that elastic client supports. >>>> >>>> The tricky part is how to implement dynamic routing into multiple >>>> clusters. >>>> - If the elastic clusters are known upfront (before submitting job), >>>> you can easily create multiple elastic sinks and prepend them with a simple >>>> filter (this is basically what split operator does). >>>> - If you discover elastics clusters at runtime, this would require some >>>> changes of the current ElasticsearchSink implementation. I think this may >>>> be actually as simple as introducing something like >>>> DynamicElasticsearchSink, that could dynamically create and managed "child" >>>> sinks. This would probably require some thoughts about how to manage >>>> consumed resources (memory), because number of child sink could be >>>> potentially unbounded. This could be of course simplified if underlying >>>> elastic client already supports that, which I'm not aware of. If you'd like >>>> to take this path, it would definitely be a great contribution to Flink >>>> (I'm able to provide some guidance). >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>> >>>> Best, >>>> D. >>>> >>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >>>> wrote: >>>> >>>>> Hi folks, >>>>> >>>>> I have a use-case that I wanted to initially pose to the mailing list >>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure >>>>> I’m >>>>> not going down the wrong path trying to accomplish this in Flink (or if >>>>> something downstream might be a better option). >>>>> >>>>> Basically, I have the following pieces to the puzzle: >>>>> >>>>> - A stream of tenant-specific events >>>>> - An HTTP endpoint containing mappings for tenant-specific Elastic >>>>> cluster information (as each tenant has its own specific Elastic >>>>> cluster/index) >>>>> >>>>> What I’m hoping to accomplish is the following: >>>>> >>>>> 1. One stream will periodically poll the HTTP endpoint and store >>>>> these cluster mappings in state (keyed by tenant with cluster info as >>>>> the >>>>> value) >>>>> 2. The event stream will be keyed by tenant and connected to the >>>>> cluster mappings stream. >>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>> tenant-specific event data to its corresponding cluster/index from the >>>>> mapping source. >>>>> >>>>> I know that the existing Elasticsearch sink supports dynamic indices, >>>>> however I didn’t know if it’s possible to adjust the cluster like I would >>>>> need on a per-tenant basis or if there’s a better approach here? >>>>> >>>>> Any advice would be appreciated. >>>>> >>>>> Thanks, >>>>> >>>>> Rion >>>>> >>>>