Thanks again David, I've spun up a JIRA issue for the ticket <https://issues.apache.org/jira/browse/FLINK-23977> while I work on getting things into the proper state. If someone with the appropriate privileges could assign it to me, I'd be appreciative. I'll likely need some assistance at a few points to ensure things look as expected, but I'm happy to help with this contribution.
Rion On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote: > AFAIK there are currently no other sources in Flink that can treat "other > sources" / "destination" as data. Most complete generic work on this topic > that I'm aware of are Splittable DoFn based IOs in Apache Beam. > > I think the best module for the contribution would be > "elasticsearch-base", because this could be easily reused for all ES > versions that we currently support. > > Best, > D. > > On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Hi David, >> >> That was perfect and it looks like this is working as I'd expected. I put >> together some larger integration tests for my specific use-case (multiple >> Elasticsearch clusters running in TestContainers) and verified that >> messages were being routed dynamically to the appropriate sinks. I forked >> the Flink repo last night and was trying to figure out the best place to >> start adding these classes in (I noticed that there were three separate ES >> packages targeting 5/6/7 respectively). I was going to try to start >> fleshing the initial implementation for this, but wanted to make sure that >> I was starting in the right place. >> >> Additionally, do you know of anything that might be similar to this even >> within other sinks? Just trying to think of something to model this after. >> Once I get things started, I'll spin up a JIRA issue for it and go from >> there. >> >> Thanks so much for your help! >> >> Rion >> >> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote: >> >>> Hi Rion, >>> >>> you just need to call *sink.setRuntimeContext(getRuntimeContext())* >>> before opening the child sink. Please see *AbstractRichFunction* [1] >>> (that EleasticsearchSink extends) for more details. >>> >>> One more note, instead of starting with integration test, I'd recommend >>> writing a unit test using *operator test harness* [2] first. This >>> should help you to discover / debug many issues upfront. You can use >>> *ElasticsearchSinkBaseTest* [3] as an example. >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators >>> [3] >>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java >>> >>> Best, >>> D. >>> >>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com> >>> wrote: >>> >>>> Hi David, >>>> >>>> Thanks again for the response, I believe that I'm getting pretty close >>>> for at least a POC-level implementation of this. Currently, I'm working >>>> with JsonObject instances throughout the pipeline, so I wanted to try this >>>> out and simply stored the routing information within the element itself for >>>> simplicity's sake right now, so it has a shape that looks something like >>>> this: >>>> >>>> { >>>> "route": { >>>> "hosts": "...", >>>> "index": "...", >>>> ... >>>> }, >>>> "all-other-fields-here" >>>> } >>>> >>>> And I've stripped back several of the layers of the routers (since I >>>> already have all of the information in the element at that point). I tried >>>> using something like this: >>>> >>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), >>>> CheckpointedFunction { >>>> private val sinkRoutes: MutableMap<String, >>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap() >>>> private lateinit var configuration: Configuration >>>> >>>> override fun open(parameters: Configuration) { >>>> configuration = parameters >>>> } >>>> >>>> override fun invoke(element: JsonObject, context: >>>> SinkFunction.Context) { >>>> val route = getHost(element) >>>> // Check if we already have a router for this cluster >>>> var sink = sinkRoutes[route] >>>> if (sink == null) { >>>> // If not, create one >>>> sink = buildSinkFromRoute(element) >>>> sink.open(configuration) >>>> sinkRoutes[route] = sink >>>> } >>>> >>>> sink.invoke(element, context) >>>> } >>>> >>>> override fun initializeState(context: FunctionInitializationContext) { >>>> // No-op. >>>> } >>>> >>>> override fun snapshotState(context: FunctionSnapshotContext) { >>>> // This is used only to flush pending writes. >>>> for (sink in sinkRoutes.values) { >>>> sink.snapshotState(context) >>>> } >>>> } >>>> >>>> override fun close() { >>>> for (sink in sinkRoutes.values) { >>>> sink.close() >>>> } >>>> } >>>> >>>> private fun buildSinkFromRoute(element: JsonObject, ho): >>>> ElasticsearchSink<JsonObject> { >>>> val builder = ElasticsearchSink.Builder<JsonObject>( >>>> buildHostsFromElement(element), >>>> ElasticsearchRoutingFunction() >>>> ) >>>> >>>> builder.setBulkFlushMaxActions(1) >>>> >>>> // TODO: Configure authorization if available >>>> // builder.setRestClientFactory { restClient -> >>>> // restClient.setHttpClientConfigCallback(object : >>>> RestClientBuilder.HttpClientConfigCallback { >>>> // override fun customizeHttpClient(builder: >>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder { >>>> // // Configure authorization here >>>> // val credentialsProvider = >>>> BasicCredentialsProvider().apply { >>>> // setCredentials( >>>> // AuthScope.ANY, >>>> // UsernamePasswordCredentials("$USERNAME", >>>> "$PASSWORD") >>>> // ) >>>> // } >>>> // >>>> // return >>>> builder.setDefaultCredentialsProvider(credentialsProvider); >>>> // } >>>> // }) >>>> // } >>>> >>>> return builder.build() >>>> } >>>> >>>> private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{ >>>> val transportAddresses = element >>>> .get("route").asJsonObject >>>> .get("hosts").asString >>>> >>>> // If there are multiple, they should be comma-delimited >>>> val addresses = transportAddresses.split(",") >>>> return addresses >>>> .filter { address -> address.isNotEmpty() } >>>> .map { address -> >>>> HttpHost.create(address) >>>> } >>>> } >>>> >>>> private fun getHost(element: JsonObject): String { >>>> return element >>>> .get("route").asJsonObject >>>> .get("hosts").asString >>>> } >>>> >>>> private class ElasticsearchRoutingFunction: >>>> ElasticsearchSinkFunction<JsonObject> { >>>> override fun process(element: JsonObject, context: RuntimeContext, >>>> indexer: RequestIndexer) { >>>> indexer.add(request(element)) >>>> } >>>> >>>> private fun request(element: JsonObject): IndexRequest { >>>> // Access routing information >>>> val index = element >>>> .get("route").asJsonObject >>>> .get("index").asString >>>> >>>> // Strip off routing information >>>> element.remove("route") >>>> >>>> // Send the request >>>> return Requests.indexRequest() >>>> .index(index) >>>> .type("_doc") >>>> .source(mapOf( >>>> "data" to "$element" >>>> )) >>>> } >>>> } >>>> } >>>> >>>> After running an integration test, I keep encountering running into the >>>> following error during the invocation of the child sink: >>>> >>>> // The runtime context has not been initialized. >>>> sink.invoke(element, context) >>>> >>>> I can see the underlying sink getting initialized, the open call being >>>> made, etc. however for some reason it looks like there's an issue related >>>> to the context during the invoke call namely* "The runtime context has >>>> not been initialized". *I had assumed this would be alright since the >>>> context for the "wrapper" should have already been initialized, but maybe >>>> there's something that I'm missing. >>>> >>>> Also, please forgive any hastily written or nasty code as this is >>>> purely a POC to see if I could get this to work using a single object. I >>>> have the hopes of cleaning it up and genericizing it after I am confident >>>> that it actually works. >>>> >>>> Thanks so much again, >>>> >>>> Rion >>>> >>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote: >>>> >>>>> Hi Rion, >>>>> >>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid >>>>> for the reminder <3. >>>>> >>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass >>>>>> those elements to the sink, which would create the tenant-specific >>>>>> Elastic >>>>>> connection from the ConfigurationT element and handle caching it and >>>>>> then just grab the element and send it on it's way? >>>>> >>>>> >>>>> Yes, this is exactly what I had in mind. There should be almost no >>>>> overhead as sink can be easily chained with your join >>>>> (KeyedCoProcessFunction) function. >>>>> >>>>> - >>>>> - >>>>>> >>>>>> The shape of the elements being evicted from the process function >>>>>> (Is a simple wrapper with the configuration for the sink enough here? >>>>>> Do I >>>>>> need to explicitly initialize the sink within this function? Etc.) >>>>> >>>>> - >>>>> - To write an element you need a configuration for the destination >>>>> and the element itself, so a tuple of *(ElasticConfiguration, >>>>> Element)* should be enough (that's basically your >>>>> MessageWrapper<ElementT, >>>>> ConfigurationT> class). >>>>> >>>>> >>>>> - >>>>> - >>>>>> >>>>>> The actual use of the *DynamicElasticsearchSink* class (Would it >>>>>> just be something like an >>>>>> *.addSink(**DynamicElasticSearchSink<**String, >>>>>> Configuration>())* or perhaps something else entirely?) >>>>> >>>>> - >>>>> >>>>> I guess it could look something like the snippet below. It would be >>>>> definitely good to play around with the *DynamicElasticSearchSink* >>>>> API and make it more meaningful / user friendly (the gist I've shared was >>>>> just a very rough prototype to showcase the idea). >>>>> >>>>> >>>>> - static class Destination { >>>>> >>>>> private final List<HttpHost> httpHosts; >>>>> >>>>> Destination(List<HttpHost> httpHosts) { >>>>> this.httpHosts = httpHosts; >>>>> } >>>>> } >>>>> - >>>>> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >>>>> toWrite.addSink( >>>>> new DynamicElasticsearchSink<>( >>>>> new SinkRouter< >>>>> Tuple2<Destination, String>, >>>>> String, >>>>> ElasticsearchSink<Tuple2<Destination, >>>>> String>>>() { >>>>> >>>>> @Override >>>>> public String getRoute(Tuple2<Destination, >>>>> String> element) { >>>>> - // Construct a deterministic unique >>>>> caching key for the destination... (this could be cheaper if you know >>>>> the >>>>> data) >>>>> return element.f0.httpHosts.stream() >>>>> .map(HttpHost::toHostString) >>>>> .collect(Collectors.joining(",")); >>>>> } >>>>> >>>>> @Override >>>>> public ElasticsearchSink<Tuple2<Destination, >>>>> String>> createSink( >>>>> String cacheKey, Tuple2<Destination, >>>>> String> element) { >>>>> return new ElasticsearchSink.Builder<>( >>>>> element.f0.httpHosts, >>>>> (ElasticsearchSinkFunction< >>>>> >>>>> Tuple2<Destination, String>>) >>>>> (el, ctx, indexer) >>>>> -> { >>>>> // Construct >>>>> index request. >>>>> final >>>>> IndexRequest request = ...; >>>>> >>>>> indexer.add(request); >>>>> }) >>>>> .build(); >>>>> } >>>>> })); >>>>> >>>>> >>>>> I hope this helps ;) >>>>> >>>>> Best, >>>>> D. >>>>> >>>>> >>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for this suggestion David, it's extremely helpful. >>>>>> >>>>>> Since this will vary depending on the elements retrieved from a >>>>>> separate stream, I'm guessing something like the following would be >>>>>> roughly the avenue to continue down: >>>>>> >>>>>> fun main(args: Array<String>) { >>>>>> val parameters = mergeParametersFromProperties(args) >>>>>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>>>>> >>>>>> // Get the stream for tenant-specific Elastic configurations >>>>>> val connectionStream = stream >>>>>> .fromSource( >>>>>> KafkaSource.of(parameters, listOf("elastic-configs")), >>>>>> WatermarkStrategy.noWatermarks(), >>>>>> "elastic-configs" >>>>>> ) >>>>>> >>>>>> // Get the stream of incoming messages to be routed to Elastic >>>>>> stream >>>>>> .fromSource( >>>>>> KafkaSource.of(parameters, listOf("messages")), >>>>>> WatermarkStrategy.noWatermarks(), >>>>>> "messages" >>>>>> ) >>>>>> .keyBy { message -> >>>>>> // Key by the tenant in the message >>>>>> message.getTenant() >>>>>> } >>>>>> .connect( >>>>>> // Connect the messages stream with the configurations >>>>>> connectionStream >>>>>> ) >>>>>> .process(object : KeyedCoProcessFunction<String, String, String, >>>>>> String>() { >>>>>> // For this key, we need to store all of the previous >>>>>> messages in state >>>>>> // in the case where we don't have a given mapping for this >>>>>> tenant yet >>>>>> lateinit var messagesAwaitingConfigState: ListState<String> >>>>>> lateinit var configState: ValueState<String> >>>>>> >>>>>> override fun open(parameters: Configuration) { >>>>>> super.open(parameters) >>>>>> // Initialize the states >>>>>> messagesAwaitingConfigState = >>>>>> runtimeContext.getListState(awaitingStateDesc) >>>>>> configState = runtimeContext.getState(configStateDesc) >>>>>> } >>>>>> >>>>>> // When an element is received >>>>>> override fun processElement1(message: String, context: >>>>>> Context, out: Collector<String>) { >>>>>> // Check if we have a mapping >>>>>> if (configState.value() == null){ >>>>>> // We don't have a mapping for this tenant, store >>>>>> messages until we get it >>>>>> messagesAwaitingConfigState.add(message) >>>>>> } >>>>>> else { >>>>>> // Output the record with some indicator of the >>>>>> route? >>>>>> out.collect(message) >>>>>> } >>>>>> } >>>>>> >>>>>> override fun processElement2(config: String, context: >>>>>> Context, out: Collector<String>) { >>>>>> // If this mapping is for this specific tenant, store it >>>>>> and flush the pending >>>>>> // records in state >>>>>> if (config.getTenant() == context.currentKey){ >>>>>> configState.update(config) >>>>>> val messagesToFlush = >>>>>> messagesAwaitingConfigState.get() >>>>>> messagesToFlush.forEach { message -> >>>>>> out.collect(message) >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> // State descriptors >>>>>> val awaitingStateDesc = ListStateDescriptor( >>>>>> "messages-awaiting-config", >>>>>> TypeInformation.of(String::class.java) >>>>>> ) >>>>>> >>>>>> val configStateDesc = ValueStateDescriptor( >>>>>> "elastic-config", >>>>>> TypeInformation.of(String::class.java) >>>>>> ) >>>>>> }) >>>>>> >>>>>> stream.executeAsync("$APPLICATION_NAME-job") >>>>>> } >>>>>> >>>>>> Basically, connect my tenant-specific configuration stream with my >>>>>> incoming messages (keyed by tenant) and buffer them until I have a >>>>>> corresponding configuration (to avoid race-conditions). However, I'm >>>>>> guessing what will happen here is rather than directly outputting the >>>>>> messages from this process function, I'd construct some type of wrapper >>>>>> here with the necessary routing/configuration for the message (obtained >>>>>> via >>>>>> the configuration stream) along with the element, which might be >>>>>> something >>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those >>>>>> elements to the sink, which would create the tenant-specific Elastic >>>>>> connection from the ConfigurationT element and handle caching it and >>>>>> then just grab the element and send it on it's way? >>>>>> >>>>>> Those are really the only bits I'm stuck on at the moment: >>>>>> >>>>>> 1. The shape of the elements being evicted from the process >>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>> enough >>>>>> here? Do I need to explicitly initialize the sink within this >>>>>> function? >>>>>> Etc.) >>>>>> 2. The actual use of the DynamicElasticsearchSink class (Would it >>>>>> just be something like an .addSink(DynamicElasticSearchSink<String, >>>>>> Configuration>()) or perhaps something else entirely?) >>>>>> >>>>>> Thanks again so much for the advice thus far David, it's greatly >>>>>> appreciated. >>>>>> >>>>>> Rion >>>>>> >>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> To give you a better idea, in high-level I think could look >>>>>>> something like this >>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1]. >>>>>>> >>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>>>>> >>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> Thanks for your response! I think there are currently quite a few >>>>>>>> unknowns in my end in terms of what a production loads look like but I >>>>>>>> think the number of clusters shouldn’t be too large (and will either >>>>>>>> rarely >>>>>>>> change or have new entries come in at runtime, but it needs to support >>>>>>>> that). >>>>>>>> >>>>>>>> I think the dynamic approach might be a good route to explore with >>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m >>>>>>>> not >>>>>>>> sure what the dynamic one would look like at the moment though, perhaps >>>>>>>> that’s something you’d be able to advise on? >>>>>>>> >>>>>>>> Given that all the records are keyed for a given tenant and I would >>>>>>>> have the mappings stored in state, is it possible that within the >>>>>>>> open() >>>>>>>> function for this dynamic route to access the state and initialize the >>>>>>>> client there? Or maybe there’s some other approach (such as grouping by >>>>>>>> clusters and dynamically handling indices)? >>>>>>>> >>>>>>>> I’d be happy to give a shot at making the appropriate changes to >>>>>>>> the sink as well, although I’m far from an Elastic expert. If you >>>>>>>> point me >>>>>>>> in the right direction, I may be able to help out. >>>>>>>> >>>>>>>> Thanks much! >>>>>>>> >>>>>>>> Rion >>>>>>>> >>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >>>>>>>> >>>>>>>> >>>>>>>> Hi Rion, >>>>>>>> >>>>>>>> As you probably already know, for dynamic indices, you can simply >>>>>>>> implement your own ElasticsearchSinkFunction >>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>>>>> [1], where you can create any request that elastic client supports. >>>>>>>> >>>>>>>> The tricky part is how to implement dynamic routing into multiple >>>>>>>> clusters. >>>>>>>> - If the elastic clusters are known upfront (before submitting >>>>>>>> job), you can easily create multiple elastic sinks and prepend them >>>>>>>> with a >>>>>>>> simple filter (this is basically what split operator does). >>>>>>>> - If you discover elastics clusters at runtime, this would require >>>>>>>> some changes of the current ElasticsearchSink implementation. I think >>>>>>>> this >>>>>>>> may be actually as simple as introducing something like >>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed >>>>>>>> "child" >>>>>>>> sinks. This would probably require some thoughts about how to manage >>>>>>>> consumed resources (memory), because number of child sink could be >>>>>>>> potentially unbounded. This could be of course simplified if underlying >>>>>>>> elastic client already supports that, which I'm not aware of. If you'd >>>>>>>> like >>>>>>>> to take this path, it would definitely be a great contribution to Flink >>>>>>>> (I'm able to provide some guidance). >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>>>>> >>>>>>>> Best, >>>>>>>> D. >>>>>>>> >>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi folks, >>>>>>>>> >>>>>>>>> I have a use-case that I wanted to initially pose to the mailing >>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to >>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in >>>>>>>>> Flink >>>>>>>>> (or if something downstream might be a better option). >>>>>>>>> >>>>>>>>> Basically, I have the following pieces to the puzzle: >>>>>>>>> >>>>>>>>> - A stream of tenant-specific events >>>>>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>>>>> Elastic cluster information (as each tenant has its own specific >>>>>>>>> Elastic >>>>>>>>> cluster/index) >>>>>>>>> >>>>>>>>> What I’m hoping to accomplish is the following: >>>>>>>>> >>>>>>>>> 1. One stream will periodically poll the HTTP endpoint and >>>>>>>>> store these cluster mappings in state (keyed by tenant with >>>>>>>>> cluster info as >>>>>>>>> the value) >>>>>>>>> 2. The event stream will be keyed by tenant and connected to >>>>>>>>> the cluster mappings stream. >>>>>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>>>>> tenant-specific event data to its corresponding cluster/index from >>>>>>>>> the >>>>>>>>> mapping source. >>>>>>>>> >>>>>>>>> I know that the existing Elasticsearch sink supports dynamic >>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster >>>>>>>>> like >>>>>>>>> I would need on a per-tenant basis or if there’s a better approach >>>>>>>>> here? >>>>>>>>> >>>>>>>>> Any advice would be appreciated. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Rion >>>>>>>>> >>>>>>>>