Hi Rion,

you just need to call *sink.setRuntimeContext(getRuntimeContext())* before
opening the child sink. Please see *AbstractRichFunction* [1] (that
EleasticsearchSink extends) for more details.

One more note, instead of starting with integration test, I'd recommend
writing a unit test using *operator test harness* [2] first. This should
help you to discover / debug many issues upfront. You can use
*ElasticsearchSinkBaseTest* [3] as an example.

[1]
https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
[3]
https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java

Best,
D.

On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
wrote:

> Hi David,
>
> Thanks again for the response, I believe that I'm getting pretty close for
> at least a POC-level implementation of this. Currently, I'm working with
> JsonObject instances throughout the pipeline, so I wanted to try this out
> and simply stored the routing information within the element itself for
> simplicity's sake right now, so it has a shape that looks something like
> this:
>
> {
>     "route": {
>         "hosts": "...",
>         "index": "...",
>         ...
>     },
>     "all-other-fields-here"
> }
>
> And I've stripped back several of the layers of the routers (since I
> already have all of the information in the element at that point). I tried
> using something like this:
>
> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
> CheckpointedFunction {
>     private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> 
> = ConcurrentHashMap()
>     private lateinit var configuration: Configuration
>
>     override fun open(parameters: Configuration) {
>         configuration = parameters
>     }
>
>     override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>         val route = getHost(element)
>         // Check if we already have a router for this cluster
>         var sink = sinkRoutes[route]
>         if (sink == null) {
>             // If not, create one
>             sink = buildSinkFromRoute(element)
>             sink.open(configuration)
>             sinkRoutes[route] = sink
>         }
>
>         sink.invoke(element, context)
>     }
>
>     override fun initializeState(context: FunctionInitializationContext) {
>         // No-op.
>     }
>
>     override fun snapshotState(context: FunctionSnapshotContext) {
>         // This is used only to flush pending writes.
>         for (sink in sinkRoutes.values) {
>             sink.snapshotState(context)
>         }
>     }
>
>     override fun close() {
>         for (sink in sinkRoutes.values) {
>             sink.close()
>         }
>     }
>
>     private fun buildSinkFromRoute(element: JsonObject, ho): 
> ElasticsearchSink<JsonObject> {
>         val builder = ElasticsearchSink.Builder<JsonObject>(
>             buildHostsFromElement(element),
>             ElasticsearchRoutingFunction()
>         )
>
>         builder.setBulkFlushMaxActions(1)
>
>         // TODO: Configure authorization if available
> //        builder.setRestClientFactory { restClient ->
> //            restClient.setHttpClientConfigCallback(object : 
> RestClientBuilder.HttpClientConfigCallback {
> //                override fun customizeHttpClient(builder: 
> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
> //                    // Configure authorization here
> //                    val credentialsProvider = 
> BasicCredentialsProvider().apply {
> //                        setCredentials(
> //                            AuthScope.ANY,
> //                            UsernamePasswordCredentials("$USERNAME", 
> "$PASSWORD")
> //                        )
> //                    }
> //
> //                    return 
> builder.setDefaultCredentialsProvider(credentialsProvider);
> //                }
> //            })
> //        }
>
>         return builder.build()
>     }
>
>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>         val transportAddresses = element
>             .get("route").asJsonObject
>             .get("hosts").asString
>
>         // If there are multiple, they should be comma-delimited
>         val addresses = transportAddresses.split(",")
>         return addresses
>             .filter { address -> address.isNotEmpty() }
>             .map { address ->
>                 HttpHost.create(address)
>             }
>     }
>
>     private fun getHost(element: JsonObject): String {
>         return element
>             .get("route").asJsonObject
>             .get("hosts").asString
>     }
>
>     private class ElasticsearchRoutingFunction: 
> ElasticsearchSinkFunction<JsonObject> {
>         override fun process(element: JsonObject, context: RuntimeContext, 
> indexer: RequestIndexer) {
>             indexer.add(request(element))
>         }
>
>         private fun request(element: JsonObject): IndexRequest {
>             // Access routing information
>             val index = element
>                 .get("route").asJsonObject
>                 .get("index").asString
>
>             // Strip off routing information
>             element.remove("route")
>
>             // Send the request
>             return Requests.indexRequest()
>                 .index(index)
>                 .type("_doc")
>                 .source(mapOf(
>                     "data" to "$element"
>                 ))
>         }
>     }
> }
>
> After running an integration test, I keep encountering running into the
> following error during the invocation of the child sink:
>
> // The runtime context has not been initialized.
> sink.invoke(element, context)
>
> I can see the underlying sink getting initialized, the open call being
> made, etc. however for some reason it looks like there's an issue related
> to the context during the invoke call namely* "The runtime context has
> not been initialized". *I had assumed this would be alright since the
> context for the "wrapper" should have already been initialized, but maybe
> there's something that I'm missing.
>
> Also, please forgive any hastily written or nasty code as this is purely a
> POC to see if I could get this to work using a single object. I have the
> hopes of cleaning it up and genericizing it after I am confident that it
> actually works.
>
> Thanks so much again,
>
> Rion
>
> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Rion,
>>
>> Sorry for late reply, I've missed your previous message. Thanks Arvid for
>> the reminder <3.
>>
>> something like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>> elements to the sink, which would create the tenant-specific Elastic
>>> connection from the ConfigurationT element and handle caching it and
>>> then just grab the element and send it on it's way?
>>
>>
>> Yes, this is exactly what I had in mind. There should be almost no
>> overhead as sink can be easily chained with your join
>> (KeyedCoProcessFunction) function.
>>
>>    -
>>    -
>>>
>>>    The shape of the elements being evicted from the process function
>>>    (Is a simple wrapper with the configuration for the sink enough here? Do 
>>> I
>>>    need to explicitly initialize the sink within this function? Etc.)
>>
>>    -
>>    - To write an element you need a configuration for the destination
>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>    Element)* should be enough (that's basically your MessageWrapper<ElementT,
>>    ConfigurationT> class).
>>
>>
>>    -
>>    -
>>>
>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>    just be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>>    Configuration>())* or perhaps something else entirely?)
>>
>>    -
>>
>> I guess it could look something like the snippet below. It would be
>> definitely good to play around with the *DynamicElasticSearchSink* API
>> and make it more meaningful / user friendly (the gist I've shared was just
>> a very rough prototype to showcase the idea).
>>
>>
>>    - static class Destination {
>>
>>        private final List<HttpHost> httpHosts;
>>
>>        Destination(List<HttpHost> httpHosts) {
>>            this.httpHosts = httpHosts;
>>        }
>>    }
>>    -
>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>    toWrite.addSink(
>>            new DynamicElasticsearchSink<>(
>>                    new SinkRouter<
>>                            Tuple2<Destination, String>,
>>                            String,
>>                            ElasticsearchSink<Tuple2<Destination,
>>    String>>>() {
>>
>>                        @Override
>>                        public String getRoute(Tuple2<Destination,
>>    String> element) {
>>    -                         // Construct a deterministic unique caching
>>    key for the destination... (this could be cheaper if you know the data)
>>                            return element.f0.httpHosts.stream()
>>                                    .map(HttpHost::toHostString)
>>                                    .collect(Collectors.joining(","));
>>                        }
>>
>>                        @Override
>>                        public ElasticsearchSink<Tuple2<Destination,
>>    String>> createSink(
>>                                String cacheKey, Tuple2<Destination,
>>    String> element) {
>>                            return new ElasticsearchSink.Builder<>(
>>                                            element.f0.httpHosts,
>>                                            (ElasticsearchSinkFunction<
>>
>>    Tuple2<Destination, String>>)
>>                                                    (el, ctx, indexer) ->
>>    {
>>                                                        // Construct
>>    index request.
>>                                                        final
>>    IndexRequest request = ...;
>>
>>    indexer.add(request);
>>                                                    })
>>                                    .build();
>>                        }
>>                    }));
>>
>>
>> I hope this helps ;)
>>
>> Best,
>> D.
>>
>>
>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Thanks for this suggestion David, it's extremely helpful.
>>>
>>> Since this will vary depending on the elements retrieved from a separate
>>> stream, I'm guessing something like the following would be roughly the
>>> avenue to continue down:
>>>
>>> fun main(args: Array<String>) {
>>>     val parameters = mergeParametersFromProperties(args)
>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>
>>>     // Get the stream for tenant-specific Elastic configurations
>>>     val connectionStream = stream
>>>         .fromSource(
>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>             WatermarkStrategy.noWatermarks(),
>>>             "elastic-configs"
>>>         )
>>>
>>>     // Get the stream of incoming messages to be routed to Elastic
>>>     stream
>>>         .fromSource(
>>>             KafkaSource.of(parameters, listOf("messages")),
>>>             WatermarkStrategy.noWatermarks(),
>>>             "messages"
>>>         )
>>>         .keyBy { message ->
>>>             // Key by the tenant in the message
>>>             message.getTenant()
>>>         }
>>>         .connect(
>>>             // Connect the messages stream with the configurations
>>>             connectionStream
>>>         )
>>>         .process(object : KeyedCoProcessFunction<String, String, String, 
>>> String>() {
>>>             // For this key, we need to store all of the previous messages 
>>> in state
>>>             // in the case where we don't have a given mapping for this 
>>> tenant yet
>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>             lateinit var configState: ValueState<String>
>>>
>>>             override fun open(parameters: Configuration) {
>>>                 super.open(parameters)
>>>                 // Initialize the states
>>>                 messagesAwaitingConfigState = 
>>> runtimeContext.getListState(awaitingStateDesc)
>>>                 configState = runtimeContext.getState(configStateDesc)
>>>             }
>>>
>>>             // When an element is received
>>>             override fun processElement1(message: String, context: Context, 
>>> out: Collector<String>) {
>>>                 // Check if we have a mapping
>>>                 if (configState.value() == null){
>>>                     // We don't have a mapping for this tenant, store 
>>> messages until we get it
>>>                     messagesAwaitingConfigState.add(message)
>>>                 }
>>>                 else {
>>>                     // Output the record with some indicator of the route?
>>>                     out.collect(message)
>>>                 }
>>>             }
>>>
>>>             override fun processElement2(config: String, context: Context, 
>>> out: Collector<String>) {
>>>                 // If this mapping is for this specific tenant, store it 
>>> and flush the pending
>>>                 // records in state
>>>                 if (config.getTenant() == context.currentKey){
>>>                     configState.update(config)
>>>                     val messagesToFlush = messagesAwaitingConfigState.get()
>>>                     messagesToFlush.forEach { message ->
>>>                         out.collect(message)
>>>                     }
>>>                 }
>>>             }
>>>
>>>             // State descriptors
>>>             val awaitingStateDesc = ListStateDescriptor(
>>>                 "messages-awaiting-config",
>>>                 TypeInformation.of(String::class.java)
>>>             )
>>>
>>>             val configStateDesc = ValueStateDescriptor(
>>>                 "elastic-config",
>>>                 TypeInformation.of(String::class.java)
>>>             )
>>>         })
>>>
>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>> }
>>>
>>> Basically, connect my tenant-specific configuration stream with my
>>> incoming messages (keyed by tenant) and buffer them until I have a
>>> corresponding configuration (to avoid race-conditions). However, I'm
>>> guessing what will happen here is rather than directly outputting the
>>> messages from this process function, I'd construct some type of wrapper
>>> here with the necessary routing/configuration for the message (obtained via
>>> the configuration stream) along with the element, which might be something
>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements
>>> to the sink, which would create the tenant-specific Elastic connection from
>>> the ConfigurationT element and handle caching it and then just grab the
>>> element and send it on it's way?
>>>
>>> Those are really the only bits I'm stuck on at the moment:
>>>
>>>    1. The shape of the elements being evicted from the process function
>>>    (Is a simple wrapper with the configuration for the sink enough here? Do 
>>> I
>>>    need to explicitly initialize the sink within this function? Etc.)
>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>    Configuration>()) or perhaps something else entirely?)
>>>
>>> Thanks again so much for the advice thus far David, it's greatly
>>> appreciated.
>>>
>>> Rion
>>>
>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> To give you a better idea, in high-level I think could look something
>>>> like this
>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>
>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>
>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks for your response! I think there are currently quite a few
>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>> think the number of clusters shouldn’t be too large (and will either 
>>>>> rarely
>>>>> change or have new entries come in at runtime, but it needs to support
>>>>> that).
>>>>>
>>>>> I think the dynamic approach might be a good route to explore with
>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not
>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>> that’s something you’d be able to advise on?
>>>>>
>>>>> Given that all the records are keyed for a given tenant and I would
>>>>> have the mappings stored in state, is it possible that within the open()
>>>>> function for this dynamic route to access the state and initialize the
>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>> clusters and dynamically handling indices)?
>>>>>
>>>>> I’d be happy to give a shot at making the appropriate changes to the
>>>>> sink as well, although I’m far from an Elastic expert. If you point me in
>>>>> the right direction, I may be able to help out.
>>>>>
>>>>> Thanks much!
>>>>>
>>>>> Rion
>>>>>
>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>>>>>
>>>>> 
>>>>> Hi Rion,
>>>>>
>>>>> As you probably already know, for dynamic indices, you can simply
>>>>> implement your own ElasticsearchSinkFunction
>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>> [1], where you can create any request that elastic client supports.
>>>>>
>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>> clusters.
>>>>> - If the elastic clusters are known upfront (before submitting job),
>>>>> you can easily create multiple elastic sinks and prepend them with a 
>>>>> simple
>>>>> filter (this is basically what split operator does).
>>>>> - If you discover elastics clusters at runtime, this would require
>>>>> some changes of the current ElasticsearchSink implementation. I think this
>>>>> may be actually as simple as introducing something like
>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>> "child"
>>>>> sinks. This would probably require some thoughts about how to manage
>>>>> consumed resources (memory), because number of child sink could be
>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>> elastic client already supports that, which I'm not aware of. If you'd 
>>>>> like
>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>> (I'm able to provide some guidance).
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I have a use-case that I wanted to initially pose to the mailing list
>>>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure 
>>>>>> I’m
>>>>>> not going down the wrong path trying to accomplish this in Flink (or if
>>>>>> something downstream might be a better option).
>>>>>>
>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>
>>>>>>    - A stream of tenant-specific events
>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>    Elastic cluster information (as each tenant has its own specific 
>>>>>> Elastic
>>>>>>    cluster/index)
>>>>>>
>>>>>> What I’m hoping to accomplish is the following:
>>>>>>
>>>>>>    1. One stream will periodically poll the HTTP endpoint and store
>>>>>>    these cluster mappings in state (keyed by tenant with cluster info as 
>>>>>> the
>>>>>>    value)
>>>>>>    2. The event stream will be keyed by tenant and connected to the
>>>>>>    cluster mappings stream.
>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>    tenant-specific event data to its corresponding cluster/index from the
>>>>>>    mapping source.
>>>>>>
>>>>>> I know that the existing Elasticsearch sink supports dynamic indices,
>>>>>> however I didn’t know if it’s possible to adjust the cluster like I would
>>>>>> need on a per-tenant basis or if there’s a better approach here?
>>>>>>
>>>>>> Any advice would be appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>

Reply via email to