Hi Rion, thanks for opening the PR. I'll take a look at it this week. I'd also pull Arvid into this topic to see whether he has any comments.
Best, D. On Sat, Sep 4, 2021 at 9:10 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi again David et al, > > I managed to push an initial pull request for the implementations for the > DynamicElasticsearchSink and related ElasticsearchSinkRouter last week > <https://github.com/apache/flink/pull/17061> and made some minor updates > today with regards to the Javadocs (included code examples, etc.) along > with a few tests that came to mind. I was hoping to get a few more eyes on > it and figure out what else might be worth adding/changing/documenting in > hopes of wrapping this feature up. > > Thanks again to everyone in this incredible community for their assistance > with this, a local implementation of it for a project of mine is working > like a charm, so I'm hoping it's something that others will be able to > leverage for their own needs. > > Rion > > On Thu, Aug 26, 2021 at 11:45 AM David Morávek <d...@apache.org> wrote: > >> Hi Rion, >> >> personally I'd start with unit test in the base module using a test sink >> implementation. There is already *DummyElasticsearchSink* that you may >> be able to reuse (just note that we're trying to get rid of Mockito based >> tests such as this one). >> >> I'm bit unsure that integration test would actually test anything extra >> that the unit test doesn't in this case, so I'd recommend it as the next >> step (I'm also bit concerned that this test would take a long time to >> execute / be resource intensive as it would need to spawn more elastic >> clusters?). >> >> Best, >> D. >> >> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Just chiming in on this again. >>> >>> I think I have the pieces in place regarding the implementation (both a >>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added >>> to the elasticsearch-base module. I noticed that HttpHost wasn't available >>> within that module/in the tests, so I'd suspect that I'd need to add a >>> dependency similar to those found within the specific ES implementations >>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink >>> similar to the other patterning to handle writing the unit tests or would >>> you recommend separate Elasticsearch integration tests using a >>> TestContainer of each supported version (5/6/7) similar to those within the >>> ElasticsearchSinkITCase files under each module? >>> >>> Any advice / recommendations on this front would be helpful. I want to >>> write some tests surrounding this that demonstrate the most common >>> use-cases, but also don't want to go overkill. >>> >>> Thanks again for all of your help, >>> >>> Rion >>> >>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com> >>> wrote: >>> >>>> Thanks again David, >>>> >>>> I've spun up a JIRA issue for the ticket >>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on >>>> getting things into the proper state. If someone with the >>>> appropriate privileges could assign it to me, I'd be appreciative. I'll >>>> likely need some assistance at a few points to ensure things look as >>>> expected, but I'm happy to help with this contribution. >>>> >>>> Rion >>>> >>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote: >>>> >>>>> AFAIK there are currently no other sources in Flink that can treat >>>>> "other sources" / "destination" as data. Most complete generic work on >>>>> this >>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam. >>>>> >>>>> I think the best module for the contribution would be >>>>> "elasticsearch-base", because this could be easily reused for all ES >>>>> versions that we currently support. >>>>> >>>>> Best, >>>>> D. >>>>> >>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi David, >>>>>> >>>>>> That was perfect and it looks like this is working as I'd expected. I >>>>>> put together some larger integration tests for my specific use-case >>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified >>>>>> that messages were being routed dynamically to the appropriate sinks. I >>>>>> forked the Flink repo last night and was trying to figure out the best >>>>>> place to start adding these classes in (I noticed that there were three >>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to >>>>>> start fleshing the initial implementation for this, but wanted to make >>>>>> sure >>>>>> that I was starting in the right place. >>>>>> >>>>>> Additionally, do you know of anything that might be similar to this >>>>>> even within other sinks? Just trying to think of something to model this >>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go >>>>>> from there. >>>>>> >>>>>> Thanks so much for your help! >>>>>> >>>>>> Rion >>>>>> >>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Rion, >>>>>>> >>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())* >>>>>>> before opening the child sink. Please see *AbstractRichFunction* >>>>>>> [1] (that EleasticsearchSink extends) for more details. >>>>>>> >>>>>>> One more note, instead of starting with integration test, I'd >>>>>>> recommend writing a unit test using *operator test harness* [2] >>>>>>> first. This should help you to discover / debug many issues upfront. You >>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example. >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators >>>>>>> [3] >>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java >>>>>>> >>>>>>> Best, >>>>>>> D. >>>>>>> >>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams < >>>>>>> rionmons...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> Thanks again for the response, I believe that I'm getting pretty >>>>>>>> close for at least a POC-level implementation of this. Currently, I'm >>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted >>>>>>>> to >>>>>>>> try this out and simply stored the routing information within the >>>>>>>> element >>>>>>>> itself for simplicity's sake right now, so it has a shape that looks >>>>>>>> something like this: >>>>>>>> >>>>>>>> { >>>>>>>> "route": { >>>>>>>> "hosts": "...", >>>>>>>> "index": "...", >>>>>>>> ... >>>>>>>> }, >>>>>>>> "all-other-fields-here" >>>>>>>> } >>>>>>>> >>>>>>>> And I've stripped back several of the layers of the routers (since >>>>>>>> I already have all of the information in the element at that point). I >>>>>>>> tried using something like this: >>>>>>>> >>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), >>>>>>>> CheckpointedFunction { >>>>>>>> private val sinkRoutes: MutableMap<String, >>>>>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap() >>>>>>>> private lateinit var configuration: Configuration >>>>>>>> >>>>>>>> override fun open(parameters: Configuration) { >>>>>>>> configuration = parameters >>>>>>>> } >>>>>>>> >>>>>>>> override fun invoke(element: JsonObject, context: >>>>>>>> SinkFunction.Context) { >>>>>>>> val route = getHost(element) >>>>>>>> // Check if we already have a router for this cluster >>>>>>>> var sink = sinkRoutes[route] >>>>>>>> if (sink == null) { >>>>>>>> // If not, create one >>>>>>>> sink = buildSinkFromRoute(element) >>>>>>>> sink.open(configuration) >>>>>>>> sinkRoutes[route] = sink >>>>>>>> } >>>>>>>> >>>>>>>> sink.invoke(element, context) >>>>>>>> } >>>>>>>> >>>>>>>> override fun initializeState(context: >>>>>>>> FunctionInitializationContext) { >>>>>>>> // No-op. >>>>>>>> } >>>>>>>> >>>>>>>> override fun snapshotState(context: FunctionSnapshotContext) { >>>>>>>> // This is used only to flush pending writes. >>>>>>>> for (sink in sinkRoutes.values) { >>>>>>>> sink.snapshotState(context) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override fun close() { >>>>>>>> for (sink in sinkRoutes.values) { >>>>>>>> sink.close() >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> private fun buildSinkFromRoute(element: JsonObject, ho): >>>>>>>> ElasticsearchSink<JsonObject> { >>>>>>>> val builder = ElasticsearchSink.Builder<JsonObject>( >>>>>>>> buildHostsFromElement(element), >>>>>>>> ElasticsearchRoutingFunction() >>>>>>>> ) >>>>>>>> >>>>>>>> builder.setBulkFlushMaxActions(1) >>>>>>>> >>>>>>>> // TODO: Configure authorization if available >>>>>>>> // builder.setRestClientFactory { restClient -> >>>>>>>> // restClient.setHttpClientConfigCallback(object : >>>>>>>> RestClientBuilder.HttpClientConfigCallback { >>>>>>>> // override fun customizeHttpClient(builder: >>>>>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder { >>>>>>>> // // Configure authorization here >>>>>>>> // val credentialsProvider = >>>>>>>> BasicCredentialsProvider().apply { >>>>>>>> // setCredentials( >>>>>>>> // AuthScope.ANY, >>>>>>>> // UsernamePasswordCredentials("$USERNAME", >>>>>>>> "$PASSWORD") >>>>>>>> // ) >>>>>>>> // } >>>>>>>> // >>>>>>>> // return >>>>>>>> builder.setDefaultCredentialsProvider(credentialsProvider); >>>>>>>> // } >>>>>>>> // }) >>>>>>>> // } >>>>>>>> >>>>>>>> return builder.build() >>>>>>>> } >>>>>>>> >>>>>>>> private fun buildHostsFromElement(element: JsonObject): >>>>>>>> List<HttpHost>{ >>>>>>>> val transportAddresses = element >>>>>>>> .get("route").asJsonObject >>>>>>>> .get("hosts").asString >>>>>>>> >>>>>>>> // If there are multiple, they should be comma-delimited >>>>>>>> val addresses = transportAddresses.split(",") >>>>>>>> return addresses >>>>>>>> .filter { address -> address.isNotEmpty() } >>>>>>>> .map { address -> >>>>>>>> HttpHost.create(address) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> private fun getHost(element: JsonObject): String { >>>>>>>> return element >>>>>>>> .get("route").asJsonObject >>>>>>>> .get("hosts").asString >>>>>>>> } >>>>>>>> >>>>>>>> private class ElasticsearchRoutingFunction: >>>>>>>> ElasticsearchSinkFunction<JsonObject> { >>>>>>>> override fun process(element: JsonObject, context: >>>>>>>> RuntimeContext, indexer: RequestIndexer) { >>>>>>>> indexer.add(request(element)) >>>>>>>> } >>>>>>>> >>>>>>>> private fun request(element: JsonObject): IndexRequest { >>>>>>>> // Access routing information >>>>>>>> val index = element >>>>>>>> .get("route").asJsonObject >>>>>>>> .get("index").asString >>>>>>>> >>>>>>>> // Strip off routing information >>>>>>>> element.remove("route") >>>>>>>> >>>>>>>> // Send the request >>>>>>>> return Requests.indexRequest() >>>>>>>> .index(index) >>>>>>>> .type("_doc") >>>>>>>> .source(mapOf( >>>>>>>> "data" to "$element" >>>>>>>> )) >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> After running an integration test, I keep encountering running into >>>>>>>> the following error during the invocation of the child sink: >>>>>>>> >>>>>>>> // The runtime context has not been initialized. >>>>>>>> sink.invoke(element, context) >>>>>>>> >>>>>>>> I can see the underlying sink getting initialized, the open call >>>>>>>> being made, etc. however for some reason it looks like there's an issue >>>>>>>> related to the context during the invoke call namely* "The runtime >>>>>>>> context has not been initialized". *I had assumed this would be >>>>>>>> alright since the context for the "wrapper" should have already been >>>>>>>> initialized, but maybe there's something that I'm missing. >>>>>>>> >>>>>>>> Also, please forgive any hastily written or nasty code as this is >>>>>>>> purely a POC to see if I could get this to work using a single object. >>>>>>>> I >>>>>>>> have the hopes of cleaning it up and genericizing it after I am >>>>>>>> confident >>>>>>>> that it actually works. >>>>>>>> >>>>>>>> Thanks so much again, >>>>>>>> >>>>>>>> Rion >>>>>>>> >>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Rion, >>>>>>>>> >>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks >>>>>>>>> Arvid for the reminder <3. >>>>>>>>> >>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and >>>>>>>>>> pass those elements to the sink, which would create the >>>>>>>>>> tenant-specific >>>>>>>>>> Elastic connection from the ConfigurationT element and handle >>>>>>>>>> caching it and then just grab the element and send it on it's way? >>>>>>>>> >>>>>>>>> >>>>>>>>> Yes, this is exactly what I had in mind. There should be almost no >>>>>>>>> overhead as sink can be easily chained with your join >>>>>>>>> (KeyedCoProcessFunction) function. >>>>>>>>> >>>>>>>>> - >>>>>>>>> - >>>>>>>>>> >>>>>>>>>> The shape of the elements being evicted from the process >>>>>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>>>>> enough >>>>>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>>>>> function? >>>>>>>>>> Etc.) >>>>>>>>> >>>>>>>>> - >>>>>>>>> - To write an element you need a configuration for the >>>>>>>>> destination and the element itself, so a tuple of >>>>>>>>> *(ElasticConfiguration, >>>>>>>>> Element)* should be enough (that's basically your >>>>>>>>> MessageWrapper<ElementT, >>>>>>>>> ConfigurationT> class). >>>>>>>>> >>>>>>>>> >>>>>>>>> - >>>>>>>>> - >>>>>>>>>> >>>>>>>>>> The actual use of the *DynamicElasticsearchSink* class (Would >>>>>>>>>> it just be something like an *.addSink(* >>>>>>>>>> *DynamicElasticSearchSink<**String, Configuration>())* or >>>>>>>>>> perhaps something else entirely?) >>>>>>>>> >>>>>>>>> - >>>>>>>>> >>>>>>>>> I guess it could look something like the snippet below. It would >>>>>>>>> be definitely good to play around with the >>>>>>>>> *DynamicElasticSearchSink* API and make it more meaningful / user >>>>>>>>> friendly (the gist I've shared was just a very rough prototype to >>>>>>>>> showcase >>>>>>>>> the idea). >>>>>>>>> >>>>>>>>> >>>>>>>>> - static class Destination { >>>>>>>>> >>>>>>>>> private final List<HttpHost> httpHosts; >>>>>>>>> >>>>>>>>> Destination(List<HttpHost> httpHosts) { >>>>>>>>> this.httpHosts = httpHosts; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> - >>>>>>>>> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >>>>>>>>> toWrite.addSink( >>>>>>>>> new DynamicElasticsearchSink<>( >>>>>>>>> new SinkRouter< >>>>>>>>> Tuple2<Destination, String>, >>>>>>>>> String, >>>>>>>>> ElasticsearchSink<Tuple2<Destination, >>>>>>>>> String>>>() { >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public String getRoute(Tuple2<Destination, >>>>>>>>> String> element) { >>>>>>>>> - // Construct a deterministic unique >>>>>>>>> caching key for the destination... (this could be cheaper if you >>>>>>>>> know the >>>>>>>>> data) >>>>>>>>> return element.f0.httpHosts.stream() >>>>>>>>> .map(HttpHost::toHostString) >>>>>>>>> >>>>>>>>> .collect(Collectors.joining(",")); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public >>>>>>>>> ElasticsearchSink<Tuple2<Destination, String>> createSink( >>>>>>>>> String cacheKey, >>>>>>>>> Tuple2<Destination, String> element) { >>>>>>>>> return new ElasticsearchSink.Builder<>( >>>>>>>>> element.f0.httpHosts, >>>>>>>>> >>>>>>>>> (ElasticsearchSinkFunction< >>>>>>>>> >>>>>>>>> Tuple2<Destination, String>>) >>>>>>>>> (el, ctx, >>>>>>>>> indexer) -> { >>>>>>>>> // >>>>>>>>> Construct index request. >>>>>>>>> final >>>>>>>>> IndexRequest request = ...; >>>>>>>>> >>>>>>>>> indexer.add(request); >>>>>>>>> }) >>>>>>>>> .build(); >>>>>>>>> } >>>>>>>>> })); >>>>>>>>> >>>>>>>>> >>>>>>>>> I hope this helps ;) >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> D. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams < >>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thanks for this suggestion David, it's extremely helpful. >>>>>>>>>> >>>>>>>>>> Since this will vary depending on the elements retrieved from a >>>>>>>>>> separate stream, I'm guessing something like the following would be >>>>>>>>>> roughly the avenue to continue down: >>>>>>>>>> >>>>>>>>>> fun main(args: Array<String>) { >>>>>>>>>> val parameters = mergeParametersFromProperties(args) >>>>>>>>>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>>>>>>>>> >>>>>>>>>> // Get the stream for tenant-specific Elastic configurations >>>>>>>>>> val connectionStream = stream >>>>>>>>>> .fromSource( >>>>>>>>>> KafkaSource.of(parameters, listOf("elastic-configs")), >>>>>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>>>>> "elastic-configs" >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> // Get the stream of incoming messages to be routed to Elastic >>>>>>>>>> stream >>>>>>>>>> .fromSource( >>>>>>>>>> KafkaSource.of(parameters, listOf("messages")), >>>>>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>>>>> "messages" >>>>>>>>>> ) >>>>>>>>>> .keyBy { message -> >>>>>>>>>> // Key by the tenant in the message >>>>>>>>>> message.getTenant() >>>>>>>>>> } >>>>>>>>>> .connect( >>>>>>>>>> // Connect the messages stream with the configurations >>>>>>>>>> connectionStream >>>>>>>>>> ) >>>>>>>>>> .process(object : KeyedCoProcessFunction<String, String, >>>>>>>>>> String, String>() { >>>>>>>>>> // For this key, we need to store all of the previous >>>>>>>>>> messages in state >>>>>>>>>> // in the case where we don't have a given mapping for >>>>>>>>>> this tenant yet >>>>>>>>>> lateinit var messagesAwaitingConfigState: >>>>>>>>>> ListState<String> >>>>>>>>>> lateinit var configState: ValueState<String> >>>>>>>>>> >>>>>>>>>> override fun open(parameters: Configuration) { >>>>>>>>>> super.open(parameters) >>>>>>>>>> // Initialize the states >>>>>>>>>> messagesAwaitingConfigState = >>>>>>>>>> runtimeContext.getListState(awaitingStateDesc) >>>>>>>>>> configState = >>>>>>>>>> runtimeContext.getState(configStateDesc) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> // When an element is received >>>>>>>>>> override fun processElement1(message: String, context: >>>>>>>>>> Context, out: Collector<String>) { >>>>>>>>>> // Check if we have a mapping >>>>>>>>>> if (configState.value() == null){ >>>>>>>>>> // We don't have a mapping for this tenant, >>>>>>>>>> store messages until we get it >>>>>>>>>> messagesAwaitingConfigState.add(message) >>>>>>>>>> } >>>>>>>>>> else { >>>>>>>>>> // Output the record with some indicator of the >>>>>>>>>> route? >>>>>>>>>> out.collect(message) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> override fun processElement2(config: String, context: >>>>>>>>>> Context, out: Collector<String>) { >>>>>>>>>> // If this mapping is for this specific tenant, >>>>>>>>>> store it and flush the pending >>>>>>>>>> // records in state >>>>>>>>>> if (config.getTenant() == context.currentKey){ >>>>>>>>>> configState.update(config) >>>>>>>>>> val messagesToFlush = >>>>>>>>>> messagesAwaitingConfigState.get() >>>>>>>>>> messagesToFlush.forEach { message -> >>>>>>>>>> out.collect(message) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> // State descriptors >>>>>>>>>> val awaitingStateDesc = ListStateDescriptor( >>>>>>>>>> "messages-awaiting-config", >>>>>>>>>> TypeInformation.of(String::class.java) >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> val configStateDesc = ValueStateDescriptor( >>>>>>>>>> "elastic-config", >>>>>>>>>> TypeInformation.of(String::class.java) >>>>>>>>>> ) >>>>>>>>>> }) >>>>>>>>>> >>>>>>>>>> stream.executeAsync("$APPLICATION_NAME-job") >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Basically, connect my tenant-specific configuration stream with >>>>>>>>>> my incoming messages (keyed by tenant) and buffer them until I have a >>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm >>>>>>>>>> guessing what will happen here is rather than directly outputting the >>>>>>>>>> messages from this process function, I'd construct some type of >>>>>>>>>> wrapper >>>>>>>>>> here with the necessary routing/configuration for the message >>>>>>>>>> (obtained via >>>>>>>>>> the configuration stream) along with the element, which might be >>>>>>>>>> something >>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those >>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic >>>>>>>>>> connection from the ConfigurationT element and handle caching it >>>>>>>>>> and then just grab the element and send it on it's way? >>>>>>>>>> >>>>>>>>>> Those are really the only bits I'm stuck on at the moment: >>>>>>>>>> >>>>>>>>>> 1. The shape of the elements being evicted from the process >>>>>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>>>>> enough >>>>>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>>>>> function? >>>>>>>>>> Etc.) >>>>>>>>>> 2. The actual use of the DynamicElasticsearchSink class >>>>>>>>>> (Would it just be something like an >>>>>>>>>> .addSink(DynamicElasticSearchSink<String, >>>>>>>>>> Configuration>()) or perhaps something else entirely?) >>>>>>>>>> >>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly >>>>>>>>>> appreciated. >>>>>>>>>> >>>>>>>>>> Rion >>>>>>>>>> >>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> To give you a better idea, in high-level I think could look >>>>>>>>>>> something like this >>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> >>>>>>>>>>> [1]. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams < >>>>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi David, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your response! I think there are currently quite a >>>>>>>>>>>> few unknowns in my end in terms of what a production loads look >>>>>>>>>>>> like but I >>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will >>>>>>>>>>>> either rarely >>>>>>>>>>>> change or have new entries come in at runtime, but it needs to >>>>>>>>>>>> support >>>>>>>>>>>> that). >>>>>>>>>>>> >>>>>>>>>>>> I think the dynamic approach might be a good route to explore >>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term >>>>>>>>>>>> option. I’m >>>>>>>>>>>> not sure what the dynamic one would look like at the moment >>>>>>>>>>>> though, perhaps >>>>>>>>>>>> that’s something you’d be able to advise on? >>>>>>>>>>>> >>>>>>>>>>>> Given that all the records are keyed for a given tenant and I >>>>>>>>>>>> would have the mappings stored in state, is it possible that >>>>>>>>>>>> within the >>>>>>>>>>>> open() function for this dynamic route to access the state and >>>>>>>>>>>> initialize >>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as >>>>>>>>>>>> grouping by >>>>>>>>>>>> clusters and dynamically handling indices)? >>>>>>>>>>>> >>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes >>>>>>>>>>>> to the sink as well, although I’m far from an Elastic expert. If >>>>>>>>>>>> you point >>>>>>>>>>>> me in the right direction, I may be able to help out. >>>>>>>>>>>> >>>>>>>>>>>> Thanks much! >>>>>>>>>>>> >>>>>>>>>>>> Rion >>>>>>>>>>>> >>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hi Rion, >>>>>>>>>>>> >>>>>>>>>>>> As you probably already know, for dynamic indices, you can >>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction >>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>>>>>>>>> [1], where you can create any request that elastic client supports. >>>>>>>>>>>> >>>>>>>>>>>> The tricky part is how to implement dynamic routing into >>>>>>>>>>>> multiple clusters. >>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting >>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend >>>>>>>>>>>> them with a >>>>>>>>>>>> simple filter (this is basically what split operator does). >>>>>>>>>>>> - If you discover elastics clusters at runtime, this would >>>>>>>>>>>> require some changes of the current ElasticsearchSink >>>>>>>>>>>> implementation. I >>>>>>>>>>>> think this may be actually as simple as introducing something like >>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and >>>>>>>>>>>> managed "child" >>>>>>>>>>>> sinks. This would probably require some thoughts about how to >>>>>>>>>>>> manage >>>>>>>>>>>> consumed resources (memory), because number of child sink could be >>>>>>>>>>>> potentially unbounded. This could be of course simplified if >>>>>>>>>>>> underlying >>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If >>>>>>>>>>>> you'd like >>>>>>>>>>>> to take this path, it would definitely be a great contribution to >>>>>>>>>>>> Flink >>>>>>>>>>>> (I'm able to provide some guidance). >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> D. >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams < >>>>>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi folks, >>>>>>>>>>>>> >>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the >>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch >>>>>>>>>>>>> connector >>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish >>>>>>>>>>>>> this in >>>>>>>>>>>>> Flink (or if something downstream might be a better option). >>>>>>>>>>>>> >>>>>>>>>>>>> Basically, I have the following pieces to the puzzle: >>>>>>>>>>>>> >>>>>>>>>>>>> - A stream of tenant-specific events >>>>>>>>>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>>>>>>>>> Elastic cluster information (as each tenant has its own >>>>>>>>>>>>> specific Elastic >>>>>>>>>>>>> cluster/index) >>>>>>>>>>>>> >>>>>>>>>>>>> What I’m hoping to accomplish is the following: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. One stream will periodically poll the HTTP endpoint and >>>>>>>>>>>>> store these cluster mappings in state (keyed by tenant with >>>>>>>>>>>>> cluster info as >>>>>>>>>>>>> the value) >>>>>>>>>>>>> 2. The event stream will be keyed by tenant and connected >>>>>>>>>>>>> to the cluster mappings stream. >>>>>>>>>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>>>>>>>>> tenant-specific event data to its corresponding cluster/index >>>>>>>>>>>>> from the >>>>>>>>>>>>> mapping source. >>>>>>>>>>>>> >>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic >>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the >>>>>>>>>>>>> cluster like >>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better >>>>>>>>>>>>> approach here? >>>>>>>>>>>>> >>>>>>>>>>>>> Any advice would be appreciated. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Rion >>>>>>>>>>>>> >>>>>>>>>>>>