Hi Rion,

thanks for opening the PR. I'll take a look at it this week. I'd also pull
Arvid into this topic to see whether he has any comments.

Best,
D.

On Sat, Sep 4, 2021 at 9:10 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hi again David et al,
>
> I managed to push an initial pull request for the implementations for the
> DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
> <https://github.com/apache/flink/pull/17061> and made some minor updates
> today with regards to the Javadocs (included code examples, etc.) along
> with a few tests that came to mind. I was hoping to get a few more eyes on
> it and figure out what else might be worth adding/changing/documenting in
> hopes of wrapping this feature up.
>
> Thanks again to everyone in this incredible community for their assistance
> with this, a local implementation of it for a project of mine is working
> like a charm, so I'm hoping it's something that others will be able to
> leverage for their own needs.
>
> Rion
>
> On Thu, Aug 26, 2021 at 11:45 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Rion,
>>
>> personally I'd start with unit test in the base module using a test sink
>> implementation. There is already *DummyElasticsearchSink* that you may
>> be able to reuse (just note that we're trying to get rid of Mockito based
>> tests such as this one).
>>
>> I'm bit unsure that integration test would actually test anything extra
>> that the unit test doesn't in this case, so I'd recommend it as the next
>> step (I'm also bit concerned that this test would take a long time to
>> execute / be resource intensive as it would need to spawn more elastic
>> clusters?).
>>
>> Best,
>> D.
>>
>> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Just chiming in on this again.
>>>
>>> I think I have the pieces in place regarding the implementation (both a
>>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>>> within that module/in the tests, so I'd suspect that I'd need to add a
>>> dependency similar to those found within the specific ES implementations
>>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>>> similar to the other patterning to handle writing the unit tests or would
>>> you recommend separate Elasticsearch integration tests using a
>>> TestContainer of each supported version (5/6/7) similar to those within the
>>> ElasticsearchSinkITCase files under each module?
>>>
>>> Any advice / recommendations on this front would be helpful. I want to
>>> write some tests surrounding this that demonstrate the most common
>>> use-cases, but also don't want to go overkill.
>>>
>>> Thanks again for all of your help,
>>>
>>> Rion
>>>
>>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com>
>>> wrote:
>>>
>>>> Thanks again David,
>>>>
>>>> I've spun up a JIRA issue for the ticket
>>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>>> getting things into the proper state. If someone with the
>>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>>> likely need some assistance at a few points to ensure things look as
>>>> expected, but I'm happy to help with this contribution.
>>>>
>>>> Rion
>>>>
>>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote:
>>>>
>>>>> AFAIK there are currently no other sources in Flink that can treat
>>>>> "other sources" / "destination" as data. Most complete generic work on 
>>>>> this
>>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>>
>>>>> I think the best module for the contribution would be
>>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>>> versions that we currently support.
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> That was perfect and it looks like this is working as I'd expected. I
>>>>>> put together some larger integration tests for my specific use-case
>>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>>> place to start adding these classes in (I noticed that there were three
>>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>>> start fleshing the initial implementation for this, but wanted to make 
>>>>>> sure
>>>>>> that I was starting in the right place.
>>>>>>
>>>>>> Additionally, do you know of anything that might be similar to this
>>>>>> even within other sinks? Just trying to think of something to model this
>>>>>> after. Once I get things started, I'll spin up a JIRA issue for it and go
>>>>>> from there.
>>>>>>
>>>>>> Thanks so much for your help!
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rion,
>>>>>>>
>>>>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>>>>> before opening the child sink. Please see *AbstractRichFunction*
>>>>>>> [1] (that EleasticsearchSink extends) for more details.
>>>>>>>
>>>>>>> One more note, instead of starting with integration test, I'd
>>>>>>> recommend writing a unit test using *operator test harness* [2]
>>>>>>> first. This should help you to discover / debug many issues upfront. You
>>>>>>> can use *ElasticsearchSinkBaseTest* [3] as an example.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>>>>> [3]
>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>>>>
>>>>>>> Best,
>>>>>>> D.
>>>>>>>
>>>>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <
>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> Thanks again for the response, I believe that I'm getting pretty
>>>>>>>> close for at least a POC-level implementation of this. Currently, I'm
>>>>>>>> working with JsonObject instances throughout the pipeline, so I wanted 
>>>>>>>> to
>>>>>>>> try this out and simply stored the routing information within the 
>>>>>>>> element
>>>>>>>> itself for simplicity's sake right now, so it has a shape that looks
>>>>>>>> something like this:
>>>>>>>>
>>>>>>>> {
>>>>>>>>     "route": {
>>>>>>>>         "hosts": "...",
>>>>>>>>         "index": "...",
>>>>>>>>         ...
>>>>>>>>     },
>>>>>>>>     "all-other-fields-here"
>>>>>>>> }
>>>>>>>>
>>>>>>>> And I've stripped back several of the layers of the routers (since
>>>>>>>> I already have all of the information in the element at that point). I
>>>>>>>> tried using something like this:
>>>>>>>>
>>>>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>>>>>>>> CheckpointedFunction {
>>>>>>>>     private val sinkRoutes: MutableMap<String, 
>>>>>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>>>>     private lateinit var configuration: Configuration
>>>>>>>>
>>>>>>>>     override fun open(parameters: Configuration) {
>>>>>>>>         configuration = parameters
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun invoke(element: JsonObject, context: 
>>>>>>>> SinkFunction.Context) {
>>>>>>>>         val route = getHost(element)
>>>>>>>>         // Check if we already have a router for this cluster
>>>>>>>>         var sink = sinkRoutes[route]
>>>>>>>>         if (sink == null) {
>>>>>>>>             // If not, create one
>>>>>>>>             sink = buildSinkFromRoute(element)
>>>>>>>>             sink.open(configuration)
>>>>>>>>             sinkRoutes[route] = sink
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         sink.invoke(element, context)
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun initializeState(context: 
>>>>>>>> FunctionInitializationContext) {
>>>>>>>>         // No-op.
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>>>>         // This is used only to flush pending writes.
>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>             sink.snapshotState(context)
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     override fun close() {
>>>>>>>>         for (sink in sinkRoutes.values) {
>>>>>>>>             sink.close()
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>>>>>>>> ElasticsearchSink<JsonObject> {
>>>>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>>>>             buildHostsFromElement(element),
>>>>>>>>             ElasticsearchRoutingFunction()
>>>>>>>>         )
>>>>>>>>
>>>>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>>>>
>>>>>>>>         // TODO: Configure authorization if available
>>>>>>>> //        builder.setRestClientFactory { restClient ->
>>>>>>>> //            restClient.setHttpClientConfigCallback(object : 
>>>>>>>> RestClientBuilder.HttpClientConfigCallback {
>>>>>>>> //                override fun customizeHttpClient(builder: 
>>>>>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>>>>> //                    // Configure authorization here
>>>>>>>> //                    val credentialsProvider = 
>>>>>>>> BasicCredentialsProvider().apply {
>>>>>>>> //                        setCredentials(
>>>>>>>> //                            AuthScope.ANY,
>>>>>>>> //                            UsernamePasswordCredentials("$USERNAME", 
>>>>>>>> "$PASSWORD")
>>>>>>>> //                        )
>>>>>>>> //                    }
>>>>>>>> //
>>>>>>>> //                    return 
>>>>>>>> builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>>>>> //                }
>>>>>>>> //            })
>>>>>>>> //        }
>>>>>>>>
>>>>>>>>         return builder.build()
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun buildHostsFromElement(element: JsonObject): 
>>>>>>>> List<HttpHost>{
>>>>>>>>         val transportAddresses = element
>>>>>>>>             .get("route").asJsonObject
>>>>>>>>             .get("hosts").asString
>>>>>>>>
>>>>>>>>         // If there are multiple, they should be comma-delimited
>>>>>>>>         val addresses = transportAddresses.split(",")
>>>>>>>>         return addresses
>>>>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>>>>             .map { address ->
>>>>>>>>                 HttpHost.create(address)
>>>>>>>>             }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private fun getHost(element: JsonObject): String {
>>>>>>>>         return element
>>>>>>>>             .get("route").asJsonObject
>>>>>>>>             .get("hosts").asString
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private class ElasticsearchRoutingFunction: 
>>>>>>>> ElasticsearchSinkFunction<JsonObject> {
>>>>>>>>         override fun process(element: JsonObject, context: 
>>>>>>>> RuntimeContext, indexer: RequestIndexer) {
>>>>>>>>             indexer.add(request(element))
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>>>>             // Access routing information
>>>>>>>>             val index = element
>>>>>>>>                 .get("route").asJsonObject
>>>>>>>>                 .get("index").asString
>>>>>>>>
>>>>>>>>             // Strip off routing information
>>>>>>>>             element.remove("route")
>>>>>>>>
>>>>>>>>             // Send the request
>>>>>>>>             return Requests.indexRequest()
>>>>>>>>                 .index(index)
>>>>>>>>                 .type("_doc")
>>>>>>>>                 .source(mapOf(
>>>>>>>>                     "data" to "$element"
>>>>>>>>                 ))
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> After running an integration test, I keep encountering running into
>>>>>>>> the following error during the invocation of the child sink:
>>>>>>>>
>>>>>>>> // The runtime context has not been initialized.
>>>>>>>> sink.invoke(element, context)
>>>>>>>>
>>>>>>>> I can see the underlying sink getting initialized, the open call
>>>>>>>> being made, etc. however for some reason it looks like there's an issue
>>>>>>>> related to the context during the invoke call namely* "The runtime
>>>>>>>> context has not been initialized". *I had assumed this would be
>>>>>>>> alright since the context for the "wrapper" should have already been
>>>>>>>> initialized, but maybe there's something that I'm missing.
>>>>>>>>
>>>>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>>>>> purely a POC to see if I could get this to work using a single object. 
>>>>>>>> I
>>>>>>>> have the hopes of cleaning it up and genericizing it after I am 
>>>>>>>> confident
>>>>>>>> that it actually works.
>>>>>>>>
>>>>>>>> Thanks so much again,
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Rion,
>>>>>>>>>
>>>>>>>>> Sorry for late reply, I've missed your previous message. Thanks
>>>>>>>>> Arvid for the reminder <3.
>>>>>>>>>
>>>>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and
>>>>>>>>>> pass those elements to the sink, which would create the 
>>>>>>>>>> tenant-specific
>>>>>>>>>> Elastic connection from the ConfigurationT element and handle
>>>>>>>>>> caching it and then just grab the element and send it on it's way?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>>>>> overhead as sink can be easily chained with your join
>>>>>>>>> (KeyedCoProcessFunction) function.
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    -
>>>>>>>>>>
>>>>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>>>>> enough
>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>>>>> function?
>>>>>>>>>>    Etc.)
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    - To write an element you need a configuration for the
>>>>>>>>>    destination and the element itself, so a tuple of 
>>>>>>>>> *(ElasticConfiguration,
>>>>>>>>>    Element)* should be enough (that's basically your 
>>>>>>>>> MessageWrapper<ElementT,
>>>>>>>>>    ConfigurationT> class).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>    -
>>>>>>>>>>
>>>>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would
>>>>>>>>>>    it just be something like an *.addSink(*
>>>>>>>>>>    *DynamicElasticSearchSink<**String, Configuration>())* or
>>>>>>>>>>    perhaps something else entirely?)
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>
>>>>>>>>> I guess it could look something like the snippet below. It would
>>>>>>>>> be definitely good to play around with the
>>>>>>>>> *DynamicElasticSearchSink* API and make it more meaningful / user
>>>>>>>>> friendly (the gist I've shared was just a very rough prototype to 
>>>>>>>>> showcase
>>>>>>>>> the idea).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - static class Destination {
>>>>>>>>>
>>>>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>>>>
>>>>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>>>>            this.httpHosts = httpHosts;
>>>>>>>>>        }
>>>>>>>>>    }
>>>>>>>>>    -
>>>>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>>>>    toWrite.addSink(
>>>>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>>>>                    new SinkRouter<
>>>>>>>>>                            Tuple2<Destination, String>,
>>>>>>>>>                            String,
>>>>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>>>>    String>>>() {
>>>>>>>>>
>>>>>>>>>                        @Override
>>>>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>>>>    String> element) {
>>>>>>>>>    -                         // Construct a deterministic unique
>>>>>>>>>    caching key for the destination... (this could be cheaper if you 
>>>>>>>>> know the
>>>>>>>>>    data)
>>>>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>>>>
>>>>>>>>>    .collect(Collectors.joining(","));
>>>>>>>>>                        }
>>>>>>>>>
>>>>>>>>>                        @Override
>>>>>>>>>                        public
>>>>>>>>>    ElasticsearchSink<Tuple2<Destination, String>> createSink(
>>>>>>>>>                                String cacheKey,
>>>>>>>>>    Tuple2<Destination, String> element) {
>>>>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>>>>                                            element.f0.httpHosts,
>>>>>>>>>
>>>>>>>>>    (ElasticsearchSinkFunction<
>>>>>>>>>
>>>>>>>>>    Tuple2<Destination, String>>)
>>>>>>>>>                                                    (el, ctx,
>>>>>>>>>    indexer) -> {
>>>>>>>>>                                                        //
>>>>>>>>>    Construct index request.
>>>>>>>>>                                                        final
>>>>>>>>>    IndexRequest request = ...;
>>>>>>>>>
>>>>>>>>>    indexer.add(request);
>>>>>>>>>                                                    })
>>>>>>>>>                                    .build();
>>>>>>>>>                        }
>>>>>>>>>                    }));
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I hope this helps ;)
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <
>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>>>>
>>>>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>>>>> roughly the avenue to continue down:
>>>>>>>>>>
>>>>>>>>>> fun main(args: Array<String>) {
>>>>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>>>>
>>>>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>>>>     val connectionStream = stream
>>>>>>>>>>         .fromSource(
>>>>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>             "elastic-configs"
>>>>>>>>>>         )
>>>>>>>>>>
>>>>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>>>>     stream
>>>>>>>>>>         .fromSource(
>>>>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>>>>             "messages"
>>>>>>>>>>         )
>>>>>>>>>>         .keyBy { message ->
>>>>>>>>>>             // Key by the tenant in the message
>>>>>>>>>>             message.getTenant()
>>>>>>>>>>         }
>>>>>>>>>>         .connect(
>>>>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>>>>             connectionStream
>>>>>>>>>>         )
>>>>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, 
>>>>>>>>>> String, String>() {
>>>>>>>>>>             // For this key, we need to store all of the previous 
>>>>>>>>>> messages in state
>>>>>>>>>>             // in the case where we don't have a given mapping for 
>>>>>>>>>> this tenant yet
>>>>>>>>>>             lateinit var messagesAwaitingConfigState: 
>>>>>>>>>> ListState<String>
>>>>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>>>>
>>>>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>>>>                 super.open(parameters)
>>>>>>>>>>                 // Initialize the states
>>>>>>>>>>                 messagesAwaitingConfigState = 
>>>>>>>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>>>>>>>                 configState = 
>>>>>>>>>> runtimeContext.getState(configStateDesc)
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             // When an element is received
>>>>>>>>>>             override fun processElement1(message: String, context: 
>>>>>>>>>> Context, out: Collector<String>) {
>>>>>>>>>>                 // Check if we have a mapping
>>>>>>>>>>                 if (configState.value() == null){
>>>>>>>>>>                     // We don't have a mapping for this tenant, 
>>>>>>>>>> store messages until we get it
>>>>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>>>>                 }
>>>>>>>>>>                 else {
>>>>>>>>>>                     // Output the record with some indicator of the 
>>>>>>>>>> route?
>>>>>>>>>>                     out.collect(message)
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             override fun processElement2(config: String, context: 
>>>>>>>>>> Context, out: Collector<String>) {
>>>>>>>>>>                 // If this mapping is for this specific tenant, 
>>>>>>>>>> store it and flush the pending
>>>>>>>>>>                 // records in state
>>>>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>>>>                     configState.update(config)
>>>>>>>>>>                     val messagesToFlush = 
>>>>>>>>>> messagesAwaitingConfigState.get()
>>>>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>>>>                         out.collect(message)
>>>>>>>>>>                     }
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             // State descriptors
>>>>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>>>>                 "messages-awaiting-config",
>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>             )
>>>>>>>>>>
>>>>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>>>>                 "elastic-config",
>>>>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>>>>             )
>>>>>>>>>>         })
>>>>>>>>>>
>>>>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Basically, connect my tenant-specific configuration stream with
>>>>>>>>>> my incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>>>>> messages from this process function, I'd construct some type of 
>>>>>>>>>> wrapper
>>>>>>>>>> here with the necessary routing/configuration for the message 
>>>>>>>>>> (obtained via
>>>>>>>>>> the configuration stream) along with the element, which might be 
>>>>>>>>>> something
>>>>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>>>>
>>>>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>>>>
>>>>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>>>>> enough
>>>>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>>>>> function?
>>>>>>>>>>    Etc.)
>>>>>>>>>>    2. The actual use of the DynamicElasticsearchSink class
>>>>>>>>>>    (Would it just be something like an 
>>>>>>>>>> .addSink(DynamicElasticSearchSink<String,
>>>>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>>>>
>>>>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>>>>> appreciated.
>>>>>>>>>>
>>>>>>>>>> Rion
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>>>>> something like this
>>>>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>>>>> [1].
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your response! I think there are currently quite a
>>>>>>>>>>>> few unknowns in my end in terms of what a production loads look 
>>>>>>>>>>>> like but I
>>>>>>>>>>>> think the number of clusters shouldn’t be too large (and will 
>>>>>>>>>>>> either rarely
>>>>>>>>>>>> change or have new entries come in at runtime, but it needs to 
>>>>>>>>>>>> support
>>>>>>>>>>>> that).
>>>>>>>>>>>>
>>>>>>>>>>>> I think the dynamic approach might be a good route to explore
>>>>>>>>>>>> with actual changes to the Elasticsearch sink as a longer term 
>>>>>>>>>>>> option. I’m
>>>>>>>>>>>> not sure what the dynamic one would look like at the moment 
>>>>>>>>>>>> though, perhaps
>>>>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>>>>
>>>>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>>>>> would have the mappings stored in state, is it possible that 
>>>>>>>>>>>> within the
>>>>>>>>>>>> open() function for this dynamic route to access the state and 
>>>>>>>>>>>> initialize
>>>>>>>>>>>> the client there? Or maybe there’s some other approach (such as 
>>>>>>>>>>>> grouping by
>>>>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>>>>
>>>>>>>>>>>> I’d be happy to give a shot at making the appropriate changes
>>>>>>>>>>>> to the sink as well, although I’m far from an Elastic expert. If 
>>>>>>>>>>>> you point
>>>>>>>>>>>> me in the right direction, I may be able to help out.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks much!
>>>>>>>>>>>>
>>>>>>>>>>>> Rion
>>>>>>>>>>>>
>>>>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Rion,
>>>>>>>>>>>>
>>>>>>>>>>>> As you probably already know, for dynamic indices, you can
>>>>>>>>>>>> simply implement your own ElasticsearchSinkFunction
>>>>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>>>>
>>>>>>>>>>>> The tricky part is how to implement dynamic routing into
>>>>>>>>>>>> multiple clusters.
>>>>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>>>>> job), you can easily create multiple elastic sinks and prepend 
>>>>>>>>>>>> them with a
>>>>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>>>>> - If you discover elastics clusters at runtime, this would
>>>>>>>>>>>> require some changes of the current ElasticsearchSink 
>>>>>>>>>>>> implementation. I
>>>>>>>>>>>> think this may be actually as simple as introducing something like
>>>>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and 
>>>>>>>>>>>> managed "child"
>>>>>>>>>>>> sinks. This would probably require some thoughts about how to 
>>>>>>>>>>>> manage
>>>>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>>>>> potentially unbounded. This could be of course simplified if 
>>>>>>>>>>>> underlying
>>>>>>>>>>>> elastic client already supports that, which I'm not aware of. If 
>>>>>>>>>>>> you'd like
>>>>>>>>>>>> to take this path, it would definitely be a great contribution to 
>>>>>>>>>>>> Flink
>>>>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> D.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a use-case that I wanted to initially pose to the
>>>>>>>>>>>>> mailing list as I’m not terribly familiar with the Elasticsearch 
>>>>>>>>>>>>> connector
>>>>>>>>>>>>> to ensure I’m not going down the wrong path trying to accomplish 
>>>>>>>>>>>>> this in
>>>>>>>>>>>>> Flink (or if something downstream might be a better option).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>>>>    Elastic cluster information (as each tenant has its own 
>>>>>>>>>>>>> specific Elastic
>>>>>>>>>>>>>    cluster/index)
>>>>>>>>>>>>>
>>>>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with 
>>>>>>>>>>>>> cluster info as
>>>>>>>>>>>>>    the value)
>>>>>>>>>>>>>    2. The event stream will be keyed by tenant and connected
>>>>>>>>>>>>>    to the cluster mappings stream.
>>>>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index 
>>>>>>>>>>>>> from the
>>>>>>>>>>>>>    mapping source.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the 
>>>>>>>>>>>>> cluster like
>>>>>>>>>>>>> I would need on a per-tenant basis or if there’s a better 
>>>>>>>>>>>>> approach here?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Rion
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to