Hi again David et al, I managed to push an initial pull request for the implementations for the DynamicElasticsearchSink and related ElasticsearchSinkRouter last week <https://github.com/apache/flink/pull/17061> and made some minor updates today with regards to the Javadocs (included code examples, etc.) along with a few tests that came to mind. I was hoping to get a few more eyes on it and figure out what else might be worth adding/changing/documenting in hopes of wrapping this feature up.
Thanks again to everyone in this incredible community for their assistance with this, a local implementation of it for a project of mine is working like a charm, so I'm hoping it's something that others will be able to leverage for their own needs. Rion On Thu, Aug 26, 2021 at 11:45 AM David Morávek <d...@apache.org> wrote: > Hi Rion, > > personally I'd start with unit test in the base module using a test sink > implementation. There is already *DummyElasticsearchSink* that you may be > able to reuse (just note that we're trying to get rid of Mockito based > tests such as this one). > > I'm bit unsure that integration test would actually test anything extra > that the unit test doesn't in this case, so I'd recommend it as the next > step (I'm also bit concerned that this test would take a long time to > execute / be resource intensive as it would need to spawn more elastic > clusters?). > > Best, > D. > > On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Just chiming in on this again. >> >> I think I have the pieces in place regarding the implementation (both a >> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added >> to the elasticsearch-base module. I noticed that HttpHost wasn't available >> within that module/in the tests, so I'd suspect that I'd need to add a >> dependency similar to those found within the specific ES implementations >> (5/6/7). I'd also assume that it may be best to just provide a dummy sink >> similar to the other patterning to handle writing the unit tests or would >> you recommend separate Elasticsearch integration tests using a >> TestContainer of each supported version (5/6/7) similar to those within the >> ElasticsearchSinkITCase files under each module? >> >> Any advice / recommendations on this front would be helpful. I want to >> write some tests surrounding this that demonstrate the most common >> use-cases, but also don't want to go overkill. >> >> Thanks again for all of your help, >> >> Rion >> >> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Thanks again David, >>> >>> I've spun up a JIRA issue for the ticket >>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on >>> getting things into the proper state. If someone with the >>> appropriate privileges could assign it to me, I'd be appreciative. I'll >>> likely need some assistance at a few points to ensure things look as >>> expected, but I'm happy to help with this contribution. >>> >>> Rion >>> >>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote: >>> >>>> AFAIK there are currently no other sources in Flink that can treat >>>> "other sources" / "destination" as data. Most complete generic work on this >>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam. >>>> >>>> I think the best module for the contribution would be >>>> "elasticsearch-base", because this could be easily reused for all ES >>>> versions that we currently support. >>>> >>>> Best, >>>> D. >>>> >>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> >>>> wrote: >>>> >>>>> Hi David, >>>>> >>>>> That was perfect and it looks like this is working as I'd expected. I >>>>> put together some larger integration tests for my specific use-case >>>>> (multiple Elasticsearch clusters running in TestContainers) and verified >>>>> that messages were being routed dynamically to the appropriate sinks. I >>>>> forked the Flink repo last night and was trying to figure out the best >>>>> place to start adding these classes in (I noticed that there were three >>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to >>>>> start fleshing the initial implementation for this, but wanted to make >>>>> sure >>>>> that I was starting in the right place. >>>>> >>>>> Additionally, do you know of anything that might be similar to this >>>>> even within other sinks? Just trying to think of something to model this >>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go >>>>> from there. >>>>> >>>>> Thanks so much for your help! >>>>> >>>>> Rion >>>>> >>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote: >>>>> >>>>>> Hi Rion, >>>>>> >>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())* >>>>>> before opening the child sink. Please see *AbstractRichFunction* [1] >>>>>> (that EleasticsearchSink extends) for more details. >>>>>> >>>>>> One more note, instead of starting with integration test, I'd >>>>>> recommend writing a unit test using *operator test harness* [2] >>>>>> first. This should help you to discover / debug many issues upfront. You >>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example. >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators >>>>>> [3] >>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java >>>>>> >>>>>> Best, >>>>>> D. >>>>>> >>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>> Thanks again for the response, I believe that I'm getting pretty >>>>>>> close for at least a POC-level implementation of this. Currently, I'm >>>>>>> working with JsonObject instances throughout the pipeline, so I wanted >>>>>>> to >>>>>>> try this out and simply stored the routing information within the >>>>>>> element >>>>>>> itself for simplicity's sake right now, so it has a shape that looks >>>>>>> something like this: >>>>>>> >>>>>>> { >>>>>>> "route": { >>>>>>> "hosts": "...", >>>>>>> "index": "...", >>>>>>> ... >>>>>>> }, >>>>>>> "all-other-fields-here" >>>>>>> } >>>>>>> >>>>>>> And I've stripped back several of the layers of the routers (since I >>>>>>> already have all of the information in the element at that point). I >>>>>>> tried >>>>>>> using something like this: >>>>>>> >>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), >>>>>>> CheckpointedFunction { >>>>>>> private val sinkRoutes: MutableMap<String, >>>>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap() >>>>>>> private lateinit var configuration: Configuration >>>>>>> >>>>>>> override fun open(parameters: Configuration) { >>>>>>> configuration = parameters >>>>>>> } >>>>>>> >>>>>>> override fun invoke(element: JsonObject, context: >>>>>>> SinkFunction.Context) { >>>>>>> val route = getHost(element) >>>>>>> // Check if we already have a router for this cluster >>>>>>> var sink = sinkRoutes[route] >>>>>>> if (sink == null) { >>>>>>> // If not, create one >>>>>>> sink = buildSinkFromRoute(element) >>>>>>> sink.open(configuration) >>>>>>> sinkRoutes[route] = sink >>>>>>> } >>>>>>> >>>>>>> sink.invoke(element, context) >>>>>>> } >>>>>>> >>>>>>> override fun initializeState(context: >>>>>>> FunctionInitializationContext) { >>>>>>> // No-op. >>>>>>> } >>>>>>> >>>>>>> override fun snapshotState(context: FunctionSnapshotContext) { >>>>>>> // This is used only to flush pending writes. >>>>>>> for (sink in sinkRoutes.values) { >>>>>>> sink.snapshotState(context) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> override fun close() { >>>>>>> for (sink in sinkRoutes.values) { >>>>>>> sink.close() >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> private fun buildSinkFromRoute(element: JsonObject, ho): >>>>>>> ElasticsearchSink<JsonObject> { >>>>>>> val builder = ElasticsearchSink.Builder<JsonObject>( >>>>>>> buildHostsFromElement(element), >>>>>>> ElasticsearchRoutingFunction() >>>>>>> ) >>>>>>> >>>>>>> builder.setBulkFlushMaxActions(1) >>>>>>> >>>>>>> // TODO: Configure authorization if available >>>>>>> // builder.setRestClientFactory { restClient -> >>>>>>> // restClient.setHttpClientConfigCallback(object : >>>>>>> RestClientBuilder.HttpClientConfigCallback { >>>>>>> // override fun customizeHttpClient(builder: >>>>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder { >>>>>>> // // Configure authorization here >>>>>>> // val credentialsProvider = >>>>>>> BasicCredentialsProvider().apply { >>>>>>> // setCredentials( >>>>>>> // AuthScope.ANY, >>>>>>> // UsernamePasswordCredentials("$USERNAME", >>>>>>> "$PASSWORD") >>>>>>> // ) >>>>>>> // } >>>>>>> // >>>>>>> // return >>>>>>> builder.setDefaultCredentialsProvider(credentialsProvider); >>>>>>> // } >>>>>>> // }) >>>>>>> // } >>>>>>> >>>>>>> return builder.build() >>>>>>> } >>>>>>> >>>>>>> private fun buildHostsFromElement(element: JsonObject): >>>>>>> List<HttpHost>{ >>>>>>> val transportAddresses = element >>>>>>> .get("route").asJsonObject >>>>>>> .get("hosts").asString >>>>>>> >>>>>>> // If there are multiple, they should be comma-delimited >>>>>>> val addresses = transportAddresses.split(",") >>>>>>> return addresses >>>>>>> .filter { address -> address.isNotEmpty() } >>>>>>> .map { address -> >>>>>>> HttpHost.create(address) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> private fun getHost(element: JsonObject): String { >>>>>>> return element >>>>>>> .get("route").asJsonObject >>>>>>> .get("hosts").asString >>>>>>> } >>>>>>> >>>>>>> private class ElasticsearchRoutingFunction: >>>>>>> ElasticsearchSinkFunction<JsonObject> { >>>>>>> override fun process(element: JsonObject, context: >>>>>>> RuntimeContext, indexer: RequestIndexer) { >>>>>>> indexer.add(request(element)) >>>>>>> } >>>>>>> >>>>>>> private fun request(element: JsonObject): IndexRequest { >>>>>>> // Access routing information >>>>>>> val index = element >>>>>>> .get("route").asJsonObject >>>>>>> .get("index").asString >>>>>>> >>>>>>> // Strip off routing information >>>>>>> element.remove("route") >>>>>>> >>>>>>> // Send the request >>>>>>> return Requests.indexRequest() >>>>>>> .index(index) >>>>>>> .type("_doc") >>>>>>> .source(mapOf( >>>>>>> "data" to "$element" >>>>>>> )) >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> After running an integration test, I keep encountering running into >>>>>>> the following error during the invocation of the child sink: >>>>>>> >>>>>>> // The runtime context has not been initialized. >>>>>>> sink.invoke(element, context) >>>>>>> >>>>>>> I can see the underlying sink getting initialized, the open call >>>>>>> being made, etc. however for some reason it looks like there's an issue >>>>>>> related to the context during the invoke call namely* "The runtime >>>>>>> context has not been initialized". *I had assumed this would be >>>>>>> alright since the context for the "wrapper" should have already been >>>>>>> initialized, but maybe there's something that I'm missing. >>>>>>> >>>>>>> Also, please forgive any hastily written or nasty code as this is >>>>>>> purely a POC to see if I could get this to work using a single object. I >>>>>>> have the hopes of cleaning it up and genericizing it after I am >>>>>>> confident >>>>>>> that it actually works. >>>>>>> >>>>>>> Thanks so much again, >>>>>>> >>>>>>> Rion >>>>>>> >>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Rion, >>>>>>>> >>>>>>>> Sorry for late reply, I've missed your previous message. Thanks >>>>>>>> Arvid for the reminder <3. >>>>>>>> >>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass >>>>>>>>> those elements to the sink, which would create the tenant-specific >>>>>>>>> Elastic >>>>>>>>> connection from the ConfigurationT element and handle caching it >>>>>>>>> and then just grab the element and send it on it's way? >>>>>>>> >>>>>>>> >>>>>>>> Yes, this is exactly what I had in mind. There should be almost no >>>>>>>> overhead as sink can be easily chained with your join >>>>>>>> (KeyedCoProcessFunction) function. >>>>>>>> >>>>>>>> - >>>>>>>> - >>>>>>>>> >>>>>>>>> The shape of the elements being evicted from the process >>>>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>>>> enough >>>>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>>>> function? >>>>>>>>> Etc.) >>>>>>>> >>>>>>>> - >>>>>>>> - To write an element you need a configuration for the >>>>>>>> destination and the element itself, so a tuple of >>>>>>>> *(ElasticConfiguration, >>>>>>>> Element)* should be enough (that's basically your >>>>>>>> MessageWrapper<ElementT, >>>>>>>> ConfigurationT> class). >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> - >>>>>>>>> >>>>>>>>> The actual use of the *DynamicElasticsearchSink* class (Would >>>>>>>>> it just be something like an *.addSink(* >>>>>>>>> *DynamicElasticSearchSink<**String, Configuration>())* or >>>>>>>>> perhaps something else entirely?) >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> I guess it could look something like the snippet below. It would be >>>>>>>> definitely good to play around with the *DynamicElasticSearchSink* >>>>>>>> API and make it more meaningful / user friendly (the gist I've shared >>>>>>>> was >>>>>>>> just a very rough prototype to showcase the idea). >>>>>>>> >>>>>>>> >>>>>>>> - static class Destination { >>>>>>>> >>>>>>>> private final List<HttpHost> httpHosts; >>>>>>>> >>>>>>>> Destination(List<HttpHost> httpHosts) { >>>>>>>> this.httpHosts = httpHosts; >>>>>>>> } >>>>>>>> } >>>>>>>> - >>>>>>>> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >>>>>>>> toWrite.addSink( >>>>>>>> new DynamicElasticsearchSink<>( >>>>>>>> new SinkRouter< >>>>>>>> Tuple2<Destination, String>, >>>>>>>> String, >>>>>>>> ElasticsearchSink<Tuple2<Destination, >>>>>>>> String>>>() { >>>>>>>> >>>>>>>> @Override >>>>>>>> public String getRoute(Tuple2<Destination, >>>>>>>> String> element) { >>>>>>>> - // Construct a deterministic unique >>>>>>>> caching key for the destination... (this could be cheaper if you >>>>>>>> know the >>>>>>>> data) >>>>>>>> return element.f0.httpHosts.stream() >>>>>>>> .map(HttpHost::toHostString) >>>>>>>> >>>>>>>> .collect(Collectors.joining(",")); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public >>>>>>>> ElasticsearchSink<Tuple2<Destination, String>> createSink( >>>>>>>> String cacheKey, >>>>>>>> Tuple2<Destination, String> element) { >>>>>>>> return new ElasticsearchSink.Builder<>( >>>>>>>> element.f0.httpHosts, >>>>>>>> >>>>>>>> (ElasticsearchSinkFunction< >>>>>>>> >>>>>>>> Tuple2<Destination, String>>) >>>>>>>> (el, ctx, >>>>>>>> indexer) -> { >>>>>>>> // >>>>>>>> Construct index request. >>>>>>>> final >>>>>>>> IndexRequest request = ...; >>>>>>>> >>>>>>>> indexer.add(request); >>>>>>>> }) >>>>>>>> .build(); >>>>>>>> } >>>>>>>> })); >>>>>>>> >>>>>>>> >>>>>>>> I hope this helps ;) >>>>>>>> >>>>>>>> Best, >>>>>>>> D. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams < >>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks for this suggestion David, it's extremely helpful. >>>>>>>>> >>>>>>>>> Since this will vary depending on the elements retrieved from a >>>>>>>>> separate stream, I'm guessing something like the following would be >>>>>>>>> roughly the avenue to continue down: >>>>>>>>> >>>>>>>>> fun main(args: Array<String>) { >>>>>>>>> val parameters = mergeParametersFromProperties(args) >>>>>>>>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>>>>>>>> >>>>>>>>> // Get the stream for tenant-specific Elastic configurations >>>>>>>>> val connectionStream = stream >>>>>>>>> .fromSource( >>>>>>>>> KafkaSource.of(parameters, listOf("elastic-configs")), >>>>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>>>> "elastic-configs" >>>>>>>>> ) >>>>>>>>> >>>>>>>>> // Get the stream of incoming messages to be routed to Elastic >>>>>>>>> stream >>>>>>>>> .fromSource( >>>>>>>>> KafkaSource.of(parameters, listOf("messages")), >>>>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>>>> "messages" >>>>>>>>> ) >>>>>>>>> .keyBy { message -> >>>>>>>>> // Key by the tenant in the message >>>>>>>>> message.getTenant() >>>>>>>>> } >>>>>>>>> .connect( >>>>>>>>> // Connect the messages stream with the configurations >>>>>>>>> connectionStream >>>>>>>>> ) >>>>>>>>> .process(object : KeyedCoProcessFunction<String, String, >>>>>>>>> String, String>() { >>>>>>>>> // For this key, we need to store all of the previous >>>>>>>>> messages in state >>>>>>>>> // in the case where we don't have a given mapping for >>>>>>>>> this tenant yet >>>>>>>>> lateinit var messagesAwaitingConfigState: >>>>>>>>> ListState<String> >>>>>>>>> lateinit var configState: ValueState<String> >>>>>>>>> >>>>>>>>> override fun open(parameters: Configuration) { >>>>>>>>> super.open(parameters) >>>>>>>>> // Initialize the states >>>>>>>>> messagesAwaitingConfigState = >>>>>>>>> runtimeContext.getListState(awaitingStateDesc) >>>>>>>>> configState = runtimeContext.getState(configStateDesc) >>>>>>>>> } >>>>>>>>> >>>>>>>>> // When an element is received >>>>>>>>> override fun processElement1(message: String, context: >>>>>>>>> Context, out: Collector<String>) { >>>>>>>>> // Check if we have a mapping >>>>>>>>> if (configState.value() == null){ >>>>>>>>> // We don't have a mapping for this tenant, store >>>>>>>>> messages until we get it >>>>>>>>> messagesAwaitingConfigState.add(message) >>>>>>>>> } >>>>>>>>> else { >>>>>>>>> // Output the record with some indicator of the >>>>>>>>> route? >>>>>>>>> out.collect(message) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> override fun processElement2(config: String, context: >>>>>>>>> Context, out: Collector<String>) { >>>>>>>>> // If this mapping is for this specific tenant, store >>>>>>>>> it and flush the pending >>>>>>>>> // records in state >>>>>>>>> if (config.getTenant() == context.currentKey){ >>>>>>>>> configState.update(config) >>>>>>>>> val messagesToFlush = >>>>>>>>> messagesAwaitingConfigState.get() >>>>>>>>> messagesToFlush.forEach { message -> >>>>>>>>> out.collect(message) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> // State descriptors >>>>>>>>> val awaitingStateDesc = ListStateDescriptor( >>>>>>>>> "messages-awaiting-config", >>>>>>>>> TypeInformation.of(String::class.java) >>>>>>>>> ) >>>>>>>>> >>>>>>>>> val configStateDesc = ValueStateDescriptor( >>>>>>>>> "elastic-config", >>>>>>>>> TypeInformation.of(String::class.java) >>>>>>>>> ) >>>>>>>>> }) >>>>>>>>> >>>>>>>>> stream.executeAsync("$APPLICATION_NAME-job") >>>>>>>>> } >>>>>>>>> >>>>>>>>> Basically, connect my tenant-specific configuration stream with my >>>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a >>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm >>>>>>>>> guessing what will happen here is rather than directly outputting the >>>>>>>>> messages from this process function, I'd construct some type of >>>>>>>>> wrapper >>>>>>>>> here with the necessary routing/configuration for the message >>>>>>>>> (obtained via >>>>>>>>> the configuration stream) along with the element, which might be >>>>>>>>> something >>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those >>>>>>>>> elements to the sink, which would create the tenant-specific Elastic >>>>>>>>> connection from the ConfigurationT element and handle caching it >>>>>>>>> and then just grab the element and send it on it's way? >>>>>>>>> >>>>>>>>> Those are really the only bits I'm stuck on at the moment: >>>>>>>>> >>>>>>>>> 1. The shape of the elements being evicted from the process >>>>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>>>> enough >>>>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>>>> function? >>>>>>>>> Etc.) >>>>>>>>> 2. The actual use of the DynamicElasticsearchSink class (Would >>>>>>>>> it just be something like an >>>>>>>>> .addSink(DynamicElasticSearchSink<String, >>>>>>>>> Configuration>()) or perhaps something else entirely?) >>>>>>>>> >>>>>>>>> Thanks again so much for the advice thus far David, it's greatly >>>>>>>>> appreciated. >>>>>>>>> >>>>>>>>> Rion >>>>>>>>> >>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> To give you a better idea, in high-level I think could look >>>>>>>>>> something like this >>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> >>>>>>>>>> [1]. >>>>>>>>>> >>>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>>>>>>>> >>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams < >>>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi David, >>>>>>>>>>> >>>>>>>>>>> Thanks for your response! I think there are currently quite a >>>>>>>>>>> few unknowns in my end in terms of what a production loads look >>>>>>>>>>> like but I >>>>>>>>>>> think the number of clusters shouldn’t be too large (and will >>>>>>>>>>> either rarely >>>>>>>>>>> change or have new entries come in at runtime, but it needs to >>>>>>>>>>> support >>>>>>>>>>> that). >>>>>>>>>>> >>>>>>>>>>> I think the dynamic approach might be a good route to explore >>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term >>>>>>>>>>> option. I’m >>>>>>>>>>> not sure what the dynamic one would look like at the moment though, >>>>>>>>>>> perhaps >>>>>>>>>>> that’s something you’d be able to advise on? >>>>>>>>>>> >>>>>>>>>>> Given that all the records are keyed for a given tenant and I >>>>>>>>>>> would have the mappings stored in state, is it possible that within >>>>>>>>>>> the >>>>>>>>>>> open() function for this dynamic route to access the state and >>>>>>>>>>> initialize >>>>>>>>>>> the client there? Or maybe there’s some other approach (such as >>>>>>>>>>> grouping by >>>>>>>>>>> clusters and dynamically handling indices)? >>>>>>>>>>> >>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to >>>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you >>>>>>>>>>> point me >>>>>>>>>>> in the right direction, I may be able to help out. >>>>>>>>>>> >>>>>>>>>>> Thanks much! >>>>>>>>>>> >>>>>>>>>>> Rion >>>>>>>>>>> >>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi Rion, >>>>>>>>>>> >>>>>>>>>>> As you probably already know, for dynamic indices, you can >>>>>>>>>>> simply implement your own ElasticsearchSinkFunction >>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>>>>>>>> [1], where you can create any request that elastic client supports. >>>>>>>>>>> >>>>>>>>>>> The tricky part is how to implement dynamic routing into >>>>>>>>>>> multiple clusters. >>>>>>>>>>> - If the elastic clusters are known upfront (before submitting >>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them >>>>>>>>>>> with a >>>>>>>>>>> simple filter (this is basically what split operator does). >>>>>>>>>>> - If you discover elastics clusters at runtime, this would >>>>>>>>>>> require some changes of the current ElasticsearchSink >>>>>>>>>>> implementation. I >>>>>>>>>>> think this may be actually as simple as introducing something like >>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed >>>>>>>>>>> "child" >>>>>>>>>>> sinks. This would probably require some thoughts about how to manage >>>>>>>>>>> consumed resources (memory), because number of child sink could be >>>>>>>>>>> potentially unbounded. This could be of course simplified if >>>>>>>>>>> underlying >>>>>>>>>>> elastic client already supports that, which I'm not aware of. If >>>>>>>>>>> you'd like >>>>>>>>>>> to take this path, it would definitely be a great contribution to >>>>>>>>>>> Flink >>>>>>>>>>> (I'm able to provide some guidance). >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> D. >>>>>>>>>>> >>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams < >>>>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi folks, >>>>>>>>>>>> >>>>>>>>>>>> I have a use-case that I wanted to initially pose to the >>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch >>>>>>>>>>>> connector >>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish >>>>>>>>>>>> this in >>>>>>>>>>>> Flink (or if something downstream might be a better option). >>>>>>>>>>>> >>>>>>>>>>>> Basically, I have the following pieces to the puzzle: >>>>>>>>>>>> >>>>>>>>>>>> - A stream of tenant-specific events >>>>>>>>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>>>>>>>> Elastic cluster information (as each tenant has its own >>>>>>>>>>>> specific Elastic >>>>>>>>>>>> cluster/index) >>>>>>>>>>>> >>>>>>>>>>>> What I’m hoping to accomplish is the following: >>>>>>>>>>>> >>>>>>>>>>>> 1. One stream will periodically poll the HTTP endpoint and >>>>>>>>>>>> store these cluster mappings in state (keyed by tenant with >>>>>>>>>>>> cluster info as >>>>>>>>>>>> the value) >>>>>>>>>>>> 2. The event stream will be keyed by tenant and connected >>>>>>>>>>>> to the cluster mappings stream. >>>>>>>>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>>>>>>>> tenant-specific event data to its corresponding cluster/index >>>>>>>>>>>> from the >>>>>>>>>>>> mapping source. >>>>>>>>>>>> >>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic >>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the >>>>>>>>>>>> cluster like >>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach >>>>>>>>>>>> here? >>>>>>>>>>>> >>>>>>>>>>>> Any advice would be appreciated. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Rion >>>>>>>>>>>> >>>>>>>>>>>