Hi again David et al,

I managed to push an initial pull request for the implementations for the
DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
<https://github.com/apache/flink/pull/17061> and made some minor updates
today with regards to the Javadocs (included code examples, etc.) along
with a few tests that came to mind. I was hoping to get a few more eyes on
it and figure out what else might be worth adding/changing/documenting in
hopes of wrapping this feature up.

Thanks again to everyone in this incredible community for their assistance
with this, a local implementation of it for a project of mine is working
like a charm, so I'm hoping it's something that others will be able to
leverage for their own needs.

Rion

On Thu, Aug 26, 2021 at 11:45 AM David Morávek <d...@apache.org> wrote:

> Hi Rion,
>
> personally I'd start with unit test in the base module using a test sink
> implementation. There is already *DummyElasticsearchSink* that you may be
> able to reuse (just note that we're trying to get rid of Mockito based
> tests such as this one).
>
> I'm bit unsure that integration test would actually test anything extra
> that the unit test doesn't in this case, so I'd recommend it as the next
> step (I'm also bit concerned that this test would take a long time to
> execute / be resource intensive as it would need to spawn more elastic
> clusters?).
>
> Best,
> D.
>
> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <rionmons...@gmail.com>
> wrote:
>
>> Just chiming in on this again.
>>
>> I think I have the pieces in place regarding the implementation (both a
>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>> within that module/in the tests, so I'd suspect that I'd need to add a
>> dependency similar to those found within the specific ES implementations
>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>> similar to the other patterning to handle writing the unit tests or would
>> you recommend separate Elasticsearch integration tests using a
>> TestContainer of each supported version (5/6/7) similar to those within the
>> ElasticsearchSinkITCase files under each module?
>>
>> Any advice / recommendations on this front would be helpful. I want to
>> write some tests surrounding this that demonstrate the most common
>> use-cases, but also don't want to go overkill.
>>
>> Thanks again for all of your help,
>>
>> Rion
>>
>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Thanks again David,
>>>
>>> I've spun up a JIRA issue for the ticket
>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>> getting things into the proper state. If someone with the
>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>> likely need some assistance at a few points to ensure things look as
>>> expected, but I'm happy to help with this contribution.
>>>
>>> Rion
>>>
>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> AFAIK there are currently no other sources in Flink that can treat
>>>> "other sources" / "destination" as data. Most complete generic work on this
>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>
>>>> I think the best module for the contribution would be
>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>> versions that we currently support.
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> That was perfect and it looks like this is working as I'd expected. I
>>>>> put together some larger integration tests for my specific use-case
>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>> place to start adding these classes in (I noticed that there were three
>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>> start fleshing the initial implementation for this, but wanted to make 
>>>>> sure
>>>>> that I was starting in the right place.
>>>>>
>>>>> Additionally, do you know of anything that might be similar to this
>>>>> even within other sinks? Just trying to think of something to model this
>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>>> from there.
>>>>>
>>>>> Thanks so much for your help!
>>>>>
>>>>> Rion
>>>>>
>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote:
>>>>>
>>>>>> Hi Rion,
>>>>>>
>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>>>> (that EleasticsearchSink extends) for more details.
>>>>>>
>>>>>> One more note, instead of starting with integration test, I'd
>>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>>> [3]
>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted 
>>>>>>> to
>>>>>>> try this out and simply stored the routing information within the 
>>>>>>> element
>>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>>> something like this:
>>>>>>>
>>>>>>> {
>>>>>>>     "route": {
>>>>>>>         "hosts": "...",
>>>>>>>         "index": "...",
>>>>>>>         ...
>>>>>>>     },
>>>>>>>     "all-other-fields-here"
>>>>>>> }
>>>>>>>
>>>>>>> And I've stripped back several of the layers of the routers (since I
>>>>>>> already have all of the information in the element at that point). I 
>>>>>>> tried
>>>>>>> using something like this:
>>>>>>>
>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>>>>>>> CheckpointedFunction {
>>>>>>>     private val sinkRoutes: MutableMap<String, 
>>>>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>>     private lateinit var configuration: Configuration
>>>>>>>
>>>>>>>     override fun open(parameters: Configuration) {
>>>>>>>         configuration = parameters
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun invoke(element: JsonObject, context: 
>>>>>>> SinkFunction.Context) {
>>>>>>>         val route = getHost(element)
>>>>>>>         // Check if we already have a router for this cluster
>>>>>>>         var sink = sinkRoutes[route]
>>>>>>>         if (sink == null) {
>>>>>>>             // If not, create one
>>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>>             sink.open(configuration)
>>>>>>>             sinkRoutes[route] = sink
>>>>>>>         }
>>>>>>>
>>>>>>>         sink.invoke(element, context)
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun initializeState(context: 
>>>>>>> FunctionInitializationContext) {
>>>>>>>         // No-op.
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>>         // This is used only to flush pending writes.
>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>             sink.snapshotState(context)
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     override fun close() {
>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>             sink.close()
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>>>>>>> ElasticsearchSink<JsonObject> {
>>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>>             buildHostsFromElement(element),
>>>>>>>             ElasticsearchRoutingFunction()
>>>>>>>         )
>>>>>>>
>>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>>
>>>>>>>         // TODO: Configure authorization if available
>>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>>> //            restClient.setHttpClientConfigCallback(object : 
>>>>>>> RestClientBuilder.HttpClientConfigCallback {
>>>>>>> //                override fun customizeHttpClient(builder: 
>>>>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>>> //                    // Configure authorization here
>>>>>>> //                    val credentialsProvider = 
>>>>>>> BasicCredentialsProvider().apply {
>>>>>>> //                        setCredentials(
>>>>>>> //                            AuthScope.ANY,
>>>>>>> //                            UsernamePasswordCredentials("$USERNAME", 
>>>>>>> "$PASSWORD")
>>>>>>> //                        )
>>>>>>> //                    }
>>>>>>> //
>>>>>>> //                    return 
>>>>>>> builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>>> //                }
>>>>>>> //            })
>>>>>>> //        }
>>>>>>>
>>>>>>>         return builder.build()
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun buildHostsFromElement(element: JsonObject): 
>>>>>>> List<HttpHost>{
>>>>>>>         val transportAddresses = element
>>>>>>>             .get("route").asJsonObject
>>>>>>>             .get("hosts").asString
>>>>>>>
>>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>>         return addresses
>>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>>             .map { address ->
>>>>>>>                 HttpHost.create(address)
>>>>>>>             }
>>>>>>>     }
>>>>>>>
>>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>>         return element
>>>>>>>             .get("route").asJsonObject
>>>>>>>             .get("hosts").asString
>>>>>>>     }
>>>>>>>
>>>>>>>     private class ElasticsearchRoutingFunction: 
>>>>>>> ElasticsearchSinkFunction<JsonObject> {
>>>>>>>         override fun process(element: JsonObject, context: 
>>>>>>> RuntimeContext, indexer: RequestIndexer) {
>>>>>>>             indexer.add(request(element))
>>>>>>>         }
>>>>>>>
>>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>>             // Access routing information
>>>>>>>             val index = element
>>>>>>>                 .get("route").asJsonObject
>>>>>>>                 .get("index").asString
>>>>>>>
>>>>>>>             // Strip off routing information
>>>>>>>             element.remove("route")
>>>>>>>
>>>>>>>             // Send the request
>>>>>>>             return Requests.indexRequest()
>>>>>>>                 .index(index)
>>>>>>>                 .type("_doc")
>>>>>>>                 .source(mapOf(
>>>>>>>                     "data" to "$element"
>>>>>>>                 ))
>>>>>>>         }
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> After running an integration test, I keep encountering running into
>>>>>>> the following error during the invocation of the child sink:
>>>>>>>
>>>>>>> // The runtime context has not been initialized.
>>>>>>> sink.invoke(element, context)
>>>>>>>
>>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>>> related to the context during the invoke call namely* "The runtime
>>>>>>> context has not been initialized". *I had assumed this would be
>>>>>>> alright since the context for the "wrapper" should have already been
>>>>>>> initialized, but maybe there's something that I'm missing.
>>>>>>>
>>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>>>> have the hopes of cleaning it up and genericizing it after I am 
>>>>>>> confident
>>>>>>> that it actually works.
>>>>>>>
>>>>>>> Thanks so much again,
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rion,
>>>>>>>>
>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>>> Arvid for the reminder <3.
>>>>>>>>
>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>>>>> those elements to the sink, which would create the tenant-specific 
>>>>>>>>> Elastic
>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>
>>>>>>>>
>>>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>>>> overhead as sink can be easily chained with your join
>>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>>>> enough
>>>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>>>> function?
>>>>>>>>>    Etc.)
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    - To write an element you need a configuration for the
>>>>>>>>    destination and the element itself, so a tuple of 
>>>>>>>> *(ElasticConfiguration,
>>>>>>>>    Element)* should be enough (that's basically your 
>>>>>>>> MessageWrapper<ElementT,
>>>>>>>>    ConfigurationT> class).
>>>>>>>>
>>>>>>>>
>>>>>>>>    -
>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would
>>>>>>>>>    it just be something like an *.addSink(*
>>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>>    perhaps something else entirely?)
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>> I guess it could look something like the snippet below. It would be
>>>>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>>>>> API and make it more meaningful / user friendly (the gist I've shared 
>>>>>>>> was
>>>>>>>> just a very rough prototype to showcase the idea).
>>>>>>>>
>>>>>>>>
>>>>>>>>    - static class Destination {
>>>>>>>>
>>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>>
>>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>>        }
>>>>>>>>    }
>>>>>>>>    -
>>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>>    toWrite.addSink(
>>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>>                    new SinkRouter<
>>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>>                            String,
>>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>>    String>>>() {
>>>>>>>>
>>>>>>>>                        @Override
>>>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>>>    String> element) {
>>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>>    caching key for the destination... (this could be cheaper if you 
>>>>>>>> know the
>>>>>>>>    data)
>>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>>
>>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>>                        }
>>>>>>>>
>>>>>>>>                        @Override
>>>>>>>>                        public
>>>>>>>>    ElasticsearchSink<Tuple2<Destination, String>> createSink(
>>>>>>>>                                String cacheKey,
>>>>>>>>    Tuple2<Destination, String> element) {
>>>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>>>                                            element.f0.httpHosts,
>>>>>>>>
>>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>>
>>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>>                                                    (el, ctx,
>>>>>>>>    indexer) -> {
>>>>>>>>                                                        //
>>>>>>>>    Construct index request.
>>>>>>>>                                                        final
>>>>>>>>    IndexRequest request = ...;
>>>>>>>>
>>>>>>>>    indexer.add(request);
>>>>>>>>                                                    })
>>>>>>>>                                    .build();
>>>>>>>>                        }
>>>>>>>>                    }));
>>>>>>>>
>>>>>>>>
>>>>>>>> I hope this helps ;)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> D.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <
>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>>
>>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>>> roughly the avenue to continue down:
>>>>>>>>>
>>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>>
>>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>>     val connectionStream = stream
>>>>>>>>>         .fromSource(
>>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>             "elastic-configs"
>>>>>>>>>         )
>>>>>>>>>
>>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>>     stream
>>>>>>>>>         .fromSource(
>>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>             "messages"
>>>>>>>>>         )
>>>>>>>>>         .keyBy { message ->
>>>>>>>>>             // Key by the tenant in the message
>>>>>>>>>             message.getTenant()
>>>>>>>>>         }
>>>>>>>>>         .connect(
>>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>>             connectionStream
>>>>>>>>>         )
>>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, 
>>>>>>>>> String, String>() {
>>>>>>>>>             // For this key, we need to store all of the previous 
>>>>>>>>> messages in state
>>>>>>>>>             // in the case where we don't have a given mapping for 
>>>>>>>>> this tenant yet
>>>>>>>>>             lateinit var messagesAwaitingConfigState: 
>>>>>>>>> ListState<String>
>>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>>
>>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>>                 super.open(parameters)
>>>>>>>>>                 // Initialize the states
>>>>>>>>>                 messagesAwaitingConfigState = 
>>>>>>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             // When an element is received
>>>>>>>>>             override fun processElement1(message: String, context: 
>>>>>>>>> Context, out: Collector<String>) {
>>>>>>>>>                 // Check if we have a mapping
>>>>>>>>>                 if (configState.value() == null){
>>>>>>>>>                     // We don't have a mapping for this tenant, store 
>>>>>>>>> messages until we get it
>>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>>                 }
>>>>>>>>>                 else {
>>>>>>>>>                     // Output the record with some indicator of the 
>>>>>>>>> route?
>>>>>>>>>                     out.collect(message)
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             override fun processElement2(config: String, context: 
>>>>>>>>> Context, out: Collector<String>) {
>>>>>>>>>                 // If this mapping is for this specific tenant, store 
>>>>>>>>> it and flush the pending
>>>>>>>>>                 // records in state
>>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>>                     configState.update(config)
>>>>>>>>>                     val messagesToFlush = 
>>>>>>>>> messagesAwaitingConfigState.get()
>>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>>                         out.collect(message)
>>>>>>>>>                     }
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             // State descriptors
>>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>             )
>>>>>>>>>
>>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>>                 "elastic-config",
>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>             )
>>>>>>>>>         })
>>>>>>>>>
>>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>>> messages from this process function, I'd construct some type of 
>>>>>>>>> wrapper
>>>>>>>>> here with the necessary routing/configuration for the message 
>>>>>>>>> (obtained via
>>>>>>>>> the configuration stream) along with the element, which might be 
>>>>>>>>> something
>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>>
>>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>>
>>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>>>> enough
>>>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>>>> function?
>>>>>>>>>    Etc.)
>>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would
>>>>>>>>>    it just be something like an 
>>>>>>>>> .addSink(DynamicElasticSearchSink<String,
>>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>>
>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>>> appreciated.
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>>> something like this
>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>>> [1].
>>>>>>>>>>
>>>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi David,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your response! I think there are currently quite a
>>>>>>>>>>> few unknowns in my end in terms of what a production loads look 
>>>>>>>>>>> like but I
>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will 
>>>>>>>>>>> either rarely
>>>>>>>>>>> change or have new entries come in at runtime, but it needs to 
>>>>>>>>>>> support
>>>>>>>>>>> that).
>>>>>>>>>>>
>>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term 
>>>>>>>>>>> option. I’m
>>>>>>>>>>> not sure what the dynamic one would look like at the moment though, 
>>>>>>>>>>> perhaps
>>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>>
>>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>>> would have the mappings stored in state, is it possible that within 
>>>>>>>>>>> the
>>>>>>>>>>> open() function for this dynamic route to access the state and 
>>>>>>>>>>> initialize
>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as 
>>>>>>>>>>> grouping by
>>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>>
>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you 
>>>>>>>>>>> point me
>>>>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>>>>
>>>>>>>>>>> Thanks much!
>>>>>>>>>>>
>>>>>>>>>>> Rion
>>>>>>>>>>>
>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> 
>>>>>>>>>>> Hi Rion,
>>>>>>>>>>>
>>>>>>>>>>> As you probably already know, for dynamic indices, you can
>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction
>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>>
>>>>>>>>>>> The tricky part is how to implement dynamic routing into
>>>>>>>>>>> multiple clusters.
>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them 
>>>>>>>>>>> with a
>>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>>> require some changes of the current ElasticsearchSink 
>>>>>>>>>>> implementation. I
>>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>>>>>>>> "child"
>>>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>>> potentially unbounded. This could be of course simplified if 
>>>>>>>>>>> underlying
>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If 
>>>>>>>>>>> you'd like
>>>>>>>>>>> to take this path, it would definitely be a great contribution to 
>>>>>>>>>>> Flink
>>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> D.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the
>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch 
>>>>>>>>>>>> connector
>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish 
>>>>>>>>>>>> this in
>>>>>>>>>>>> Flink (or if something downstream might be a better option).
>>>>>>>>>>>>
>>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>>
>>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>>>    Elastic cluster information (as each tenant has its own 
>>>>>>>>>>>> specific Elastic
>>>>>>>>>>>>    cluster/index)
>>>>>>>>>>>>
>>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with 
>>>>>>>>>>>> cluster info as
>>>>>>>>>>>>    the value)
>>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected
>>>>>>>>>>>>    to the cluster mappings stream.
>>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index 
>>>>>>>>>>>> from the
>>>>>>>>>>>>    mapping source.
>>>>>>>>>>>>
>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the 
>>>>>>>>>>>> cluster like
>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach 
>>>>>>>>>>>> here?
>>>>>>>>>>>>
>>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Rion
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to