To give you a better idea, in high-level I think could look something like this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
[1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi David, > > Thanks for your response! I think there are currently quite a few unknowns > in my end in terms of what a production loads look like but I think the > number of clusters shouldn’t be too large (and will either rarely change or > have new entries come in at runtime, but it needs to support that). > > I think the dynamic approach might be a good route to explore with actual > changes to the Elasticsearch sink as a longer term option. I’m not sure > what the dynamic one would look like at the moment though, perhaps that’s > something you’d be able to advise on? > > Given that all the records are keyed for a given tenant and I would have > the mappings stored in state, is it possible that within the open() > function for this dynamic route to access the state and initialize the > client there? Or maybe there’s some other approach (such as grouping by > clusters and dynamically handling indices)? > > I’d be happy to give a shot at making the appropriate changes to the sink > as well, although I’m far from an Elastic expert. If you point me in the > right direction, I may be able to help out. > > Thanks much! > > Rion > > On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: > > > Hi Rion, > > As you probably already know, for dynamic indices, you can simply > implement your own ElasticsearchSinkFunction > <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> > [1], where you can create any request that elastic client supports. > > The tricky part is how to implement dynamic routing into multiple > clusters. > - If the elastic clusters are known upfront (before submitting job), you > can easily create multiple elastic sinks and prepend them with a simple > filter (this is basically what split operator does). > - If you discover elastics clusters at runtime, this would require some > changes of the current ElasticsearchSink implementation. I think this may > be actually as simple as introducing something like > DynamicElasticsearchSink, that could dynamically create and managed "child" > sinks. This would probably require some thoughts about how to manage > consumed resources (memory), because number of child sink could be > potentially unbounded. This could be of course simplified if underlying > elastic client already supports that, which I'm not aware of. If you'd like > to take this path, it would definitely be a great contribution to Flink > (I'm able to provide some guidance). > > [1] > https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java > > Best, > D. > > On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Hi folks, >> >> I have a use-case that I wanted to initially pose to the mailing list as >> I’m not terribly familiar with the Elasticsearch connector to ensure I’m >> not going down the wrong path trying to accomplish this in Flink (or if >> something downstream might be a better option). >> >> Basically, I have the following pieces to the puzzle: >> >> - A stream of tenant-specific events >> - An HTTP endpoint containing mappings for tenant-specific Elastic >> cluster information (as each tenant has its own specific Elastic >> cluster/index) >> >> What I’m hoping to accomplish is the following: >> >> 1. One stream will periodically poll the HTTP endpoint and store >> these cluster mappings in state (keyed by tenant with cluster info as the >> value) >> 2. The event stream will be keyed by tenant and connected to the >> cluster mappings stream. >> 3. I’ll need to an Elasticsearch sink that can route the >> tenant-specific event data to its corresponding cluster/index from the >> mapping source. >> >> I know that the existing Elasticsearch sink supports dynamic indices, >> however I didn’t know if it’s possible to adjust the cluster like I would >> need on a per-tenant basis or if there’s a better approach here? >> >> Any advice would be appreciated. >> >> Thanks, >> >> Rion >> >