Hi Rion, you just need to call *sink.setRuntimeContext(getRuntimeContext())* before opening the child sink. Please see *AbstractRichFunction* [1] (that EleasticsearchSink extends) for more details.
One more note, instead of starting with integration test, I'd recommend writing a unit test using *operator test harness* [2] first. This should help you to discover / debug many issues upfront. You can use *ElasticsearchSinkBaseTest* [3] as an example. [1] https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators [3] https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java Best, D. On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com> wrote: > Hi David, > > Thanks again for the response, I believe that I'm getting pretty close for > at least a POC-level implementation of this. Currently, I'm working with > JsonObject instances throughout the pipeline, so I wanted to try this out > and simply stored the routing information within the element itself for > simplicity's sake right now, so it has a shape that looks something like > this: > > { > "route": { > "hosts": "...", > "index": "...", > ... > }, > "all-other-fields-here" > } > > And I've stripped back several of the layers of the routers (since I > already have all of the information in the element at that point). I tried > using something like this: > > class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), > CheckpointedFunction { > private val sinkRoutes: MutableMap<String, ElasticsearchSink<JsonObject>> > = ConcurrentHashMap() > private lateinit var configuration: Configuration > > override fun open(parameters: Configuration) { > configuration = parameters > } > > override fun invoke(element: JsonObject, context: SinkFunction.Context) { > val route = getHost(element) > // Check if we already have a router for this cluster > var sink = sinkRoutes[route] > if (sink == null) { > // If not, create one > sink = buildSinkFromRoute(element) > sink.open(configuration) > sinkRoutes[route] = sink > } > > sink.invoke(element, context) > } > > override fun initializeState(context: FunctionInitializationContext) { > // No-op. > } > > override fun snapshotState(context: FunctionSnapshotContext) { > // This is used only to flush pending writes. > for (sink in sinkRoutes.values) { > sink.snapshotState(context) > } > } > > override fun close() { > for (sink in sinkRoutes.values) { > sink.close() > } > } > > private fun buildSinkFromRoute(element: JsonObject, ho): > ElasticsearchSink<JsonObject> { > val builder = ElasticsearchSink.Builder<JsonObject>( > buildHostsFromElement(element), > ElasticsearchRoutingFunction() > ) > > builder.setBulkFlushMaxActions(1) > > // TODO: Configure authorization if available > // builder.setRestClientFactory { restClient -> > // restClient.setHttpClientConfigCallback(object : > RestClientBuilder.HttpClientConfigCallback { > // override fun customizeHttpClient(builder: > HttpAsyncClientBuilder): HttpAsyncClientBuilder { > // // Configure authorization here > // val credentialsProvider = > BasicCredentialsProvider().apply { > // setCredentials( > // AuthScope.ANY, > // UsernamePasswordCredentials("$USERNAME", > "$PASSWORD") > // ) > // } > // > // return > builder.setDefaultCredentialsProvider(credentialsProvider); > // } > // }) > // } > > return builder.build() > } > > private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{ > val transportAddresses = element > .get("route").asJsonObject > .get("hosts").asString > > // If there are multiple, they should be comma-delimited > val addresses = transportAddresses.split(",") > return addresses > .filter { address -> address.isNotEmpty() } > .map { address -> > HttpHost.create(address) > } > } > > private fun getHost(element: JsonObject): String { > return element > .get("route").asJsonObject > .get("hosts").asString > } > > private class ElasticsearchRoutingFunction: > ElasticsearchSinkFunction<JsonObject> { > override fun process(element: JsonObject, context: RuntimeContext, > indexer: RequestIndexer) { > indexer.add(request(element)) > } > > private fun request(element: JsonObject): IndexRequest { > // Access routing information > val index = element > .get("route").asJsonObject > .get("index").asString > > // Strip off routing information > element.remove("route") > > // Send the request > return Requests.indexRequest() > .index(index) > .type("_doc") > .source(mapOf( > "data" to "$element" > )) > } > } > } > > After running an integration test, I keep encountering running into the > following error during the invocation of the child sink: > > // The runtime context has not been initialized. > sink.invoke(element, context) > > I can see the underlying sink getting initialized, the open call being > made, etc. however for some reason it looks like there's an issue related > to the context during the invoke call namely* "The runtime context has > not been initialized". *I had assumed this would be alright since the > context for the "wrapper" should have already been initialized, but maybe > there's something that I'm missing. > > Also, please forgive any hastily written or nasty code as this is purely a > POC to see if I could get this to work using a single object. I have the > hopes of cleaning it up and genericizing it after I am confident that it > actually works. > > Thanks so much again, > > Rion > > On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote: > >> Hi Rion, >> >> Sorry for late reply, I've missed your previous message. Thanks Arvid for >> the reminder <3. >> >> something like a MessageWrapper<ElementT, ConfigurationT> and pass those >>> elements to the sink, which would create the tenant-specific Elastic >>> connection from the ConfigurationT element and handle caching it and >>> then just grab the element and send it on it's way? >> >> >> Yes, this is exactly what I had in mind. There should be almost no >> overhead as sink can be easily chained with your join >> (KeyedCoProcessFunction) function. >> >> - >> - >>> >>> The shape of the elements being evicted from the process function >>> (Is a simple wrapper with the configuration for the sink enough here? Do >>> I >>> need to explicitly initialize the sink within this function? Etc.) >> >> - >> - To write an element you need a configuration for the destination >> and the element itself, so a tuple of *(ElasticConfiguration, >> Element)* should be enough (that's basically your MessageWrapper<ElementT, >> ConfigurationT> class). >> >> >> - >> - >>> >>> The actual use of the *DynamicElasticsearchSink* class (Would it >>> just be something like an *.addSink(**DynamicElasticSearchSink<**String, >>> Configuration>())* or perhaps something else entirely?) >> >> - >> >> I guess it could look something like the snippet below. It would be >> definitely good to play around with the *DynamicElasticSearchSink* API >> and make it more meaningful / user friendly (the gist I've shared was just >> a very rough prototype to showcase the idea). >> >> >> - static class Destination { >> >> private final List<HttpHost> httpHosts; >> >> Destination(List<HttpHost> httpHosts) { >> this.httpHosts = httpHosts; >> } >> } >> - >> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >> toWrite.addSink( >> new DynamicElasticsearchSink<>( >> new SinkRouter< >> Tuple2<Destination, String>, >> String, >> ElasticsearchSink<Tuple2<Destination, >> String>>>() { >> >> @Override >> public String getRoute(Tuple2<Destination, >> String> element) { >> - // Construct a deterministic unique caching >> key for the destination... (this could be cheaper if you know the data) >> return element.f0.httpHosts.stream() >> .map(HttpHost::toHostString) >> .collect(Collectors.joining(",")); >> } >> >> @Override >> public ElasticsearchSink<Tuple2<Destination, >> String>> createSink( >> String cacheKey, Tuple2<Destination, >> String> element) { >> return new ElasticsearchSink.Builder<>( >> element.f0.httpHosts, >> (ElasticsearchSinkFunction< >> >> Tuple2<Destination, String>>) >> (el, ctx, indexer) -> >> { >> // Construct >> index request. >> final >> IndexRequest request = ...; >> >> indexer.add(request); >> }) >> .build(); >> } >> })); >> >> >> I hope this helps ;) >> >> Best, >> D. >> >> >> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Thanks for this suggestion David, it's extremely helpful. >>> >>> Since this will vary depending on the elements retrieved from a separate >>> stream, I'm guessing something like the following would be roughly the >>> avenue to continue down: >>> >>> fun main(args: Array<String>) { >>> val parameters = mergeParametersFromProperties(args) >>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>> >>> // Get the stream for tenant-specific Elastic configurations >>> val connectionStream = stream >>> .fromSource( >>> KafkaSource.of(parameters, listOf("elastic-configs")), >>> WatermarkStrategy.noWatermarks(), >>> "elastic-configs" >>> ) >>> >>> // Get the stream of incoming messages to be routed to Elastic >>> stream >>> .fromSource( >>> KafkaSource.of(parameters, listOf("messages")), >>> WatermarkStrategy.noWatermarks(), >>> "messages" >>> ) >>> .keyBy { message -> >>> // Key by the tenant in the message >>> message.getTenant() >>> } >>> .connect( >>> // Connect the messages stream with the configurations >>> connectionStream >>> ) >>> .process(object : KeyedCoProcessFunction<String, String, String, >>> String>() { >>> // For this key, we need to store all of the previous messages >>> in state >>> // in the case where we don't have a given mapping for this >>> tenant yet >>> lateinit var messagesAwaitingConfigState: ListState<String> >>> lateinit var configState: ValueState<String> >>> >>> override fun open(parameters: Configuration) { >>> super.open(parameters) >>> // Initialize the states >>> messagesAwaitingConfigState = >>> runtimeContext.getListState(awaitingStateDesc) >>> configState = runtimeContext.getState(configStateDesc) >>> } >>> >>> // When an element is received >>> override fun processElement1(message: String, context: Context, >>> out: Collector<String>) { >>> // Check if we have a mapping >>> if (configState.value() == null){ >>> // We don't have a mapping for this tenant, store >>> messages until we get it >>> messagesAwaitingConfigState.add(message) >>> } >>> else { >>> // Output the record with some indicator of the route? >>> out.collect(message) >>> } >>> } >>> >>> override fun processElement2(config: String, context: Context, >>> out: Collector<String>) { >>> // If this mapping is for this specific tenant, store it >>> and flush the pending >>> // records in state >>> if (config.getTenant() == context.currentKey){ >>> configState.update(config) >>> val messagesToFlush = messagesAwaitingConfigState.get() >>> messagesToFlush.forEach { message -> >>> out.collect(message) >>> } >>> } >>> } >>> >>> // State descriptors >>> val awaitingStateDesc = ListStateDescriptor( >>> "messages-awaiting-config", >>> TypeInformation.of(String::class.java) >>> ) >>> >>> val configStateDesc = ValueStateDescriptor( >>> "elastic-config", >>> TypeInformation.of(String::class.java) >>> ) >>> }) >>> >>> stream.executeAsync("$APPLICATION_NAME-job") >>> } >>> >>> Basically, connect my tenant-specific configuration stream with my >>> incoming messages (keyed by tenant) and buffer them until I have a >>> corresponding configuration (to avoid race-conditions). However, I'm >>> guessing what will happen here is rather than directly outputting the >>> messages from this process function, I'd construct some type of wrapper >>> here with the necessary routing/configuration for the message (obtained via >>> the configuration stream) along with the element, which might be something >>> like a MessageWrapper<ElementT, ConfigurationT> and pass those elements >>> to the sink, which would create the tenant-specific Elastic connection from >>> the ConfigurationT element and handle caching it and then just grab the >>> element and send it on it's way? >>> >>> Those are really the only bits I'm stuck on at the moment: >>> >>> 1. The shape of the elements being evicted from the process function >>> (Is a simple wrapper with the configuration for the sink enough here? Do >>> I >>> need to explicitly initialize the sink within this function? Etc.) >>> 2. The actual use of the DynamicElasticsearchSink class (Would it >>> just be something like an .addSink(DynamicElasticSearchSink<String, >>> Configuration>()) or perhaps something else entirely?) >>> >>> Thanks again so much for the advice thus far David, it's greatly >>> appreciated. >>> >>> Rion >>> >>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote: >>> >>>> To give you a better idea, in high-level I think could look something >>>> like this >>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1]. >>>> >>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>> >>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> >>>> wrote: >>>> >>>>> Hi David, >>>>> >>>>> Thanks for your response! I think there are currently quite a few >>>>> unknowns in my end in terms of what a production loads look like but I >>>>> think the number of clusters shouldn’t be too large (and will either >>>>> rarely >>>>> change or have new entries come in at runtime, but it needs to support >>>>> that). >>>>> >>>>> I think the dynamic approach might be a good route to explore with >>>>> actual changes to the Elasticsearch sink as a longer term option. I’m not >>>>> sure what the dynamic one would look like at the moment though, perhaps >>>>> that’s something you’d be able to advise on? >>>>> >>>>> Given that all the records are keyed for a given tenant and I would >>>>> have the mappings stored in state, is it possible that within the open() >>>>> function for this dynamic route to access the state and initialize the >>>>> client there? Or maybe there’s some other approach (such as grouping by >>>>> clusters and dynamically handling indices)? >>>>> >>>>> I’d be happy to give a shot at making the appropriate changes to the >>>>> sink as well, although I’m far from an Elastic expert. If you point me in >>>>> the right direction, I may be able to help out. >>>>> >>>>> Thanks much! >>>>> >>>>> Rion >>>>> >>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >>>>> >>>>> >>>>> Hi Rion, >>>>> >>>>> As you probably already know, for dynamic indices, you can simply >>>>> implement your own ElasticsearchSinkFunction >>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>> [1], where you can create any request that elastic client supports. >>>>> >>>>> The tricky part is how to implement dynamic routing into multiple >>>>> clusters. >>>>> - If the elastic clusters are known upfront (before submitting job), >>>>> you can easily create multiple elastic sinks and prepend them with a >>>>> simple >>>>> filter (this is basically what split operator does). >>>>> - If you discover elastics clusters at runtime, this would require >>>>> some changes of the current ElasticsearchSink implementation. I think this >>>>> may be actually as simple as introducing something like >>>>> DynamicElasticsearchSink, that could dynamically create and managed >>>>> "child" >>>>> sinks. This would probably require some thoughts about how to manage >>>>> consumed resources (memory), because number of child sink could be >>>>> potentially unbounded. This could be of course simplified if underlying >>>>> elastic client already supports that, which I'm not aware of. If you'd >>>>> like >>>>> to take this path, it would definitely be a great contribution to Flink >>>>> (I'm able to provide some guidance). >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>> >>>>> Best, >>>>> D. >>>>> >>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi folks, >>>>>> >>>>>> I have a use-case that I wanted to initially pose to the mailing list >>>>>> as I’m not terribly familiar with the Elasticsearch connector to ensure >>>>>> I’m >>>>>> not going down the wrong path trying to accomplish this in Flink (or if >>>>>> something downstream might be a better option). >>>>>> >>>>>> Basically, I have the following pieces to the puzzle: >>>>>> >>>>>> - A stream of tenant-specific events >>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>> Elastic cluster information (as each tenant has its own specific >>>>>> Elastic >>>>>> cluster/index) >>>>>> >>>>>> What I’m hoping to accomplish is the following: >>>>>> >>>>>> 1. One stream will periodically poll the HTTP endpoint and store >>>>>> these cluster mappings in state (keyed by tenant with cluster info as >>>>>> the >>>>>> value) >>>>>> 2. The event stream will be keyed by tenant and connected to the >>>>>> cluster mappings stream. >>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>> tenant-specific event data to its corresponding cluster/index from the >>>>>> mapping source. >>>>>> >>>>>> I know that the existing Elasticsearch sink supports dynamic indices, >>>>>> however I didn’t know if it’s possible to adjust the cluster like I would >>>>>> need on a per-tenant basis or if there’s a better approach here? >>>>>> >>>>>> Any advice would be appreciated. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Rion >>>>>> >>>>>