To give you a better idea, in high-level I think could look something like
this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].

[1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8

On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hi David,
>
> Thanks for your response! I think there are currently quite a few unknowns
> in my end in terms of what a production loads look like but I think the
> number of clusters shouldn’t be too large (and will either rarely change or
> have new entries come in at runtime, but it needs to support that).
>
> I think the dynamic approach might be a good route to explore with actual
> changes to the Elasticsearch sink as a longer term option. I’m not sure
> what the dynamic one would look like at the moment though, perhaps that’s
> something you’d be able to advise on?
>
> Given that all the records are keyed for a given tenant and I would have
> the mappings stored in state, is it possible that within the open()
> function for this dynamic route to access the state and initialize the
> client there? Or maybe there’s some other approach (such as grouping by
> clusters and dynamically handling indices)?
>
> I’d be happy to give a shot at making the appropriate changes to the sink
> as well, although I’m far from an Elastic expert. If you point me in the
> right direction, I may be able to help out.
>
> Thanks much!
>
> Rion
>
> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>
> 
> Hi Rion,
>
> As you probably already know, for dynamic indices, you can simply
> implement your own ElasticsearchSinkFunction
> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
> [1], where you can create any request that elastic client supports.
>
> The tricky part is how to implement dynamic routing into multiple
> clusters.
> - If the elastic clusters are known upfront (before submitting job), you
> can easily create multiple elastic sinks and prepend them with a simple
> filter (this is basically what split operator does).
> - If you discover elastics clusters at runtime, this would require some
> changes of the current ElasticsearchSink implementation. I think this may
> be actually as simple as introducing something like
> DynamicElasticsearchSink, that could dynamically create and managed "child"
> sinks. This would probably require some thoughts about how to manage
> consumed resources (memory), because number of child sink could be
> potentially unbounded. This could be of course simplified if underlying
> elastic client already supports that, which I'm not aware of. If you'd like
> to take this path, it would definitely be a great contribution to Flink
> (I'm able to provide some guidance).
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>
> Best,
> D.
>
> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> I have a use-case that I wanted to initially pose to the mailing list as
>> I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>> not going down the wrong path trying to accomplish this in Flink (or if
>> something downstream might be a better option).
>>
>> Basically, I have the following pieces to the puzzle:
>>
>>    - A stream of tenant-specific events
>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>    cluster information (as each tenant has its own specific Elastic
>>    cluster/index)
>>
>> What I’m hoping to accomplish is the following:
>>
>>    1. One stream will periodically poll the HTTP endpoint and store
>>    these cluster mappings in state (keyed by tenant with cluster info as the
>>    value)
>>    2. The event stream will be keyed by tenant and connected to the
>>    cluster mappings stream.
>>    3. I’ll need to an Elasticsearch sink that can route the
>>    tenant-specific event data to its corresponding cluster/index from the
>>    mapping source.
>>
>> I know that the existing Elasticsearch sink supports dynamic indices,
>> however I didn’t know if it’s possible to adjust the cluster like I would
>> need on a per-tenant basis or if there’s a better approach here?
>>
>> Any advice would be appreciated.
>>
>> Thanks,
>>
>> Rion
>>
>

Reply via email to