Hi Rion,

Sorry for late reply, I've missed your previous message. Thanks Arvid for
the reminder <3.

something like a MessageWrapper<ElementT, ConfigurationT> and pass those
> elements to the sink, which would create the tenant-specific Elastic
> connection from the ConfigurationT element and handle caching it and then
> just grab the element and send it on it's way?


Yes, this is exactly what I had in mind. There should be almost no overhead
as sink can be easily chained with your join (KeyedCoProcessFunction)
function.
-
-
>
> The shape of the elements being evicted from the process function (Is a
> simple wrapper with the configuration for the sink enough here? Do I need
> to explicitly initialize the sink within this function? Etc.)

-
- To write an element you need a configuration for the destination and the
element itself, so a tuple of *(ElasticConfiguration, Element)* should be
enough (that's basically your MessageWrapper<ElementT, ConfigurationT>
class).
-
-
>
> The actual use of the *DynamicElasticsearchSink* class (Would it just be
> something like an *.addSink(**DynamicElasticSearchSink<**String,
> Configuration>())* or perhaps something else entirely?)

-
I guess it could look something like the snippet below. It would be
definitely good to play around with the *DynamicElasticSearchSink* API and
make it more meaningful / user friendly (the gist I've shared was just a
very rough prototype to showcase the idea).

- static class Destination {

    private final List<HttpHost> httpHosts;

    Destination(List<HttpHost> httpHosts) {
        this.httpHosts = httpHosts;
    }
}
-
- final DataStream<Tuple2<Destination, String>> toWrite = ...;
toWrite.addSink(
        new DynamicElasticsearchSink<>(
                new SinkRouter<
                        Tuple2<Destination, String>,
                        String,
                        ElasticsearchSink<Tuple2<Destination, String>>>() {

                    @Override
                    public String getRoute(Tuple2<Destination, String>
element) {
-                         // Construct a deterministic unique caching key
for the destination... (this could be cheaper if you know the data)
                        return element.f0.httpHosts.stream()
                                .map(HttpHost::toHostString)
                                .collect(Collectors.joining(","));
                    }

                    @Override
                    public ElasticsearchSink<Tuple2<Destination, String>>
createSink(
                            String cacheKey, Tuple2<Destination, String>
element) {
                        return new ElasticsearchSink.Builder<>(
                                        element.f0.httpHosts,
                                        (ElasticsearchSinkFunction<
                                                        Tuple2<Destination,
String>>)
                                                (el, ctx, indexer) -> {
                                                    // Construct index
request.
                                                    final IndexRequest
request = ...;
                                                    indexer.add(request);
                                                })
                                .build();
                    }
                }));

I hope this helps ;)

Best,
D.


On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> wrote:

> Thanks for this suggestion David, it's extremely helpful.
>
> Since this will vary depending on the elements retrieved from a separate
> stream, I'm guessing something like the following would be roughly the
> avenue to continue down:
>
> fun main(args: Array<String>) {
>     val parameters = mergeParametersFromProperties(args)
>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
>     // Get the stream for tenant-specific Elastic configurations
>     val connectionStream = stream
>         .fromSource(
>             KafkaSource.of(parameters, listOf("elastic-configs")),
>             WatermarkStrategy.noWatermarks(),
>             "elastic-configs"
>         )
>
>     // Get the stream of incoming messages to be routed to Elastic
>     stream
>         .fromSource(
>             KafkaSource.of(parameters, listOf("messages")),
>             WatermarkStrategy.noWatermarks(),
>             "messages"
>         )
>         .keyBy { message ->
>             // Key by the tenant in the message
>             message.getTenant()
>         }
>         .connect(
>             // Connect the messages stream with the configurations
>             connectionStream
>         )
>         .process(object : KeyedCoProcessFunction<String, String, String, 
> String>() {
>             // For this key, we need to store all of the previous messages in 
> state
>             // in the case where we don't have a given mapping for this 
> tenant yet
>             lateinit var messagesAwaitingConfigState: ListState<String>
>             lateinit var configState: ValueState<String>
>
>             override fun open(parameters: Configuration) {
>                 super.open(parameters)
>                 // Initialize the states
>                 messagesAwaitingConfigState = 
> runtimeContext.getListState(awaitingStateDesc)
>                 configState = runtimeContext.getState(configStateDesc)
>             }
>
>             // When an element is received
>             override fun processElement1(message: String, context: Context, 
> out: Collector<String>) {
>                 // Check if we have a mapping
>                 if (configState.value() == null){
>                     // We don't have a mapping for this tenant, store 
> messages until we get it
>                     messagesAwaitingConfigState.add(message)
>                 }
>                 else {
>                     // Output the record with some indicator of the route?
>                     out.collect(message)
>                 }
>             }
>
>             override fun processElement2(config: String, context: Context, 
> out: Collector<String>) {
>                 // If this mapping is for this specific tenant, store it and 
> flush the pending
>                 // records in state
>                 if (config.getTenant() == context.currentKey){
>                     configState.update(config)
>                     val messagesToFlush = messagesAwaitingConfigState.get()
>                     messagesToFlush.forEach { message ->
>                         out.collect(message)
>                     }
>                 }
>             }
>
>             // State descriptors
>             val awaitingStateDesc = ListStateDescriptor(
>                 "messages-awaiting-config",
>                 TypeInformation.of(String::class.java)
>             )
>
>             val configStateDesc = ValueStateDescriptor(
>                 "elastic-config",
>                 TypeInformation.of(String::class.java)
>             )
>         })
>
>     stream.executeAsync("$APPLICATION_NAME-job")
> }
>
> Basically, connect my tenant-specific configuration stream with my
> incoming messages (keyed by tenant) and buffer them until I have a
> corresponding configuration (to avoid race-conditions). However, I'm
> guessing what will happen here is rather than directly outputting the
> messages from this process function, I'd construct some type of wrapper
> here with the necessary routing/configuration for the message (obtained via
> the configuration stream) along with the element, which might be something
> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements
> to the sink, which would create the tenant-specific Elastic connection from
> the ConfigurationT element and handle caching it and then just grab the
> element and send it on it's way?
>
> Those are really the only bits I'm stuck on at the moment:
>
>    1. The shape of the elements being evicted from the process function
>    (Is a simple wrapper with the configuration for the sink enough here? Do I
>    need to explicitly initialize the sink within this function? Etc.)
>    2. The actual use of the DynamicElasticsearchSink class (Would it just
>    be something like an .addSink(DynamicElasticSearchSink<String,
>    Configuration>()) or perhaps something else entirely?)
>
> Thanks again so much for the advice thus far David, it's greatly
> appreciated.
>
> Rion
>
> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote:
>
>> To give you a better idea, in high-level I think could look something
>> like this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>> [1].
>>
>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>
>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for your response! I think there are currently quite a few
>>> unknowns in my end in terms of what a production loads look like but I
>>> think the number of clusters shouldn’t be too large (and will either rarely
>>> change or have new entries come in at runtime, but it needs to support
>>> that).
>>>
>>> I think the dynamic approach might be a good route to explore with
>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>> sure what the dynamic one would look like at the moment though, perhaps
>>> that’s something you’d be able to advise on?
>>>
>>> Given that all the records are keyed for a given tenant and I would have
>>> the mappings stored in state, is it possible that within the open()
>>> function for this dynamic route to access the state and initialize the
>>> client there? Or maybe there’s some other approach (such as grouping by
>>> clusters and dynamically handling indices)?
>>>
>>> I’d be happy to give a shot at making the appropriate changes to the
>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>> the right direction, I may be able to help out.
>>>
>>> Thanks much!
>>>
>>> Rion
>>>
>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>>>
>>> 
>>> Hi Rion,
>>>
>>> As you probably already know, for dynamic indices, you can simply
>>> implement your own ElasticsearchSinkFunction
>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>> [1], where you can create any request that elastic client supports.
>>>
>>> The tricky part is how to implement dynamic routing into multiple
>>> clusters.
>>> - If the elastic clusters are known upfront (before submitting job), you
>>> can easily create multiple elastic sinks and prepend them with a simple
>>> filter (this is basically what split operator does).
>>> - If you discover elastics clusters at runtime, this would require some
>>> changes of the current ElasticsearchSink implementation. I think this may
>>> be actually as simple as introducing something like
>>> DynamicElasticsearchSink, that could dynamically create and managed "child"
>>> sinks. This would probably require some thoughts about how to manage
>>> consumed resources (memory), because number of child sink could be
>>> potentially unbounded. This could be of course simplified if underlying
>>> elastic client already supports that, which I'm not aware of. If you'd like
>>> to take this path, it would definitely be a great contribution to Flink
>>> (I'm able to provide some guidance).
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>
>>> Best,
>>> D.
>>>
>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I have a use-case that I wanted to initially pose to the mailing list
>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure I’m
>>>> not going down the wrong path trying to accomplish this in Flink (or if
>>>> something downstream might be a better option).
>>>>
>>>> Basically, I have the following pieces to the puzzle:
>>>>
>>>>    - A stream of tenant-specific events
>>>>    - An HTTP endpoint containing mappings for tenant-specific Elastic
>>>>    cluster information (as each tenant has its own specific Elastic
>>>>    cluster/index)
>>>>
>>>> What I’m hoping to accomplish is the following:
>>>>
>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>    these cluster mappings in state (keyed by tenant with cluster info as 
>>>> the
>>>>    value)
>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>    cluster mappings stream.
>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>    mapping source.
>>>>
>>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>>> need on a per-tenant basis or if there’s a better approach here?
>>>>
>>>> Any advice would be appreciated.
>>>>
>>>> Thanks,
>>>>
>>>> Rion
>>>>
>>>

Reply via email to