Hi Rion, Sorry for late reply, I've missed your previous message. Thanks Arvid for the reminder <3.
something like a MessageWrapper<ElementT, ConfigurationT> and pass those > elements to the sink, which would create the tenant-specific Elastic > connection from the ConfigurationT element and handle caching it and then > just grab the element and send it on it's way? Yes, this is exactly what I had in mind. There should be almost no overhead as sink can be easily chained with your join (KeyedCoProcessFunction) function. - - > > The shape of the elements being evicted from the process function (Is a > simple wrapper with the configuration for the sink enough here? Do I need > to explicitly initialize the sink within this function? Etc.) - - To write an element you need a configuration for the destination and the element itself, so a tuple of *(ElasticConfiguration, Element)* should be enough (that's basically your MessageWrapper<ElementT, ConfigurationT> class). - - > > The actual use of the *DynamicElasticsearchSink* class (Would it just be > something like an *.addSink(**DynamicElasticSearchSink<**String, > Configuration>())* or perhaps something else entirely?) - I guess it could look something like the snippet below. It would be definitely good to play around with the *DynamicElasticSearchSink* API and make it more meaningful / user friendly (the gist I've shared was just a very rough prototype to showcase the idea). - static class Destination { private final List<HttpHost> httpHosts; Destination(List<HttpHost> httpHosts) { this.httpHosts = httpHosts; } } - - final DataStream<Tuple2<Destination, String>> toWrite = ...; toWrite.addSink( new DynamicElasticsearchSink<>( new SinkRouter< Tuple2<Destination, String>, String, ElasticsearchSink<Tuple2<Destination, String>>>() { @Override public String getRoute(Tuple2<Destination, String> element) { - // Construct a deterministic unique caching key for the destination... (this could be cheaper if you know the data) return element.f0.httpHosts.stream() .map(HttpHost::toHostString) .collect(Collectors.joining(",")); } @Override public ElasticsearchSink<Tuple2<Destination, String>> createSink( String cacheKey, Tuple2<Destination, String> element) { return new ElasticsearchSink.Builder<>( element.f0.httpHosts, (ElasticsearchSinkFunction< Tuple2<Destination, String>>) (el, ctx, indexer) -> { // Construct index request. final IndexRequest request = ...; indexer.add(request); }) .build(); } })); I hope this helps ;) Best, D. On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> wrote: > Thanks for this suggestion David, it's extremely helpful. > > Since this will vary depending on the elements retrieved from a separate > stream, I'm guessing something like the following would be roughly the > avenue to continue down: > > fun main(args: Array<String>) { > val parameters = mergeParametersFromProperties(args) > val stream = StreamExecutionEnvironment.getExecutionEnvironment() > > // Get the stream for tenant-specific Elastic configurations > val connectionStream = stream > .fromSource( > KafkaSource.of(parameters, listOf("elastic-configs")), > WatermarkStrategy.noWatermarks(), > "elastic-configs" > ) > > // Get the stream of incoming messages to be routed to Elastic > stream > .fromSource( > KafkaSource.of(parameters, listOf("messages")), > WatermarkStrategy.noWatermarks(), > "messages" > ) > .keyBy { message -> > // Key by the tenant in the message > message.getTenant() > } > .connect( > // Connect the messages stream with the configurations > connectionStream > ) > .process(object : KeyedCoProcessFunction<String, String, String, > String>() { > // For this key, we need to store all of the previous messages in > state > // in the case where we don't have a given mapping for this > tenant yet > lateinit var messagesAwaitingConfigState: ListState<String> > lateinit var configState: ValueState<String> > > override fun open(parameters: Configuration) { > super.open(parameters) > // Initialize the states > messagesAwaitingConfigState = > runtimeContext.getListState(awaitingStateDesc) > configState = runtimeContext.getState(configStateDesc) > } > > // When an element is received > override fun processElement1(message: String, context: Context, > out: Collector<String>) { > // Check if we have a mapping > if (configState.value() == null){ > // We don't have a mapping for this tenant, store > messages until we get it > messagesAwaitingConfigState.add(message) > } > else { > // Output the record with some indicator of the route? > out.collect(message) > } > } > > override fun processElement2(config: String, context: Context, > out: Collector<String>) { > // If this mapping is for this specific tenant, store it and > flush the pending > // records in state > if (config.getTenant() == context.currentKey){ > configState.update(config) > val messagesToFlush = messagesAwaitingConfigState.get() > messagesToFlush.forEach { message -> > out.collect(message) > } > } > } > > // State descriptors > val awaitingStateDesc = ListStateDescriptor( > "messages-awaiting-config", > TypeInformation.of(String::class.java) > ) > > val configStateDesc = ValueStateDescriptor( > "elastic-config", > TypeInformation.of(String::class.java) > ) > }) > > stream.executeAsync("$APPLICATION_NAME-job") > } > > Basically, connect my tenant-specific configuration stream with my > incoming messages (keyed by tenant) and buffer them until I have a > corresponding configuration (to avoid race-conditions). However, I'm > guessing what will happen here is rather than directly outputting the > messages from this process function, I'd construct some type of wrapper > here with the necessary routing/configuration for the message (obtained via > the configuration stream) along with the element, which might be something > like a MessageWrapper<ElementT, ConfigurationT> and pass those elements > to the sink, which would create the tenant-specific Elastic connection from > the ConfigurationT element and handle caching it and then just grab the > element and send it on it's way? > > Those are really the only bits I'm stuck on at the moment: > > 1. The shape of the elements being evicted from the process function > (Is a simple wrapper with the configuration for the sink enough here? Do I > need to explicitly initialize the sink within this function? Etc.) > 2. The actual use of the DynamicElasticsearchSink class (Would it just > be something like an .addSink(DynamicElasticSearchSink<String, > Configuration>()) or perhaps something else entirely?) > > Thanks again so much for the advice thus far David, it's greatly > appreciated. > > Rion > > On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> wrote: > >> To give you a better idea, in high-level I think could look something >> like this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> >> [1]. >> >> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >> >> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Hi David, >>> >>> Thanks for your response! I think there are currently quite a few >>> unknowns in my end in terms of what a production loads look like but I >>> think the number of clusters shouldn’t be too large (and will either rarely >>> change or have new entries come in at runtime, but it needs to support >>> that). >>> >>> I think the dynamic approach might be a good route to explore with >>> actual changes to the Elasticsearch sink as a longer term option. I’m not >>> sure what the dynamic one would look like at the moment though, perhaps >>> that’s something you’d be able to advise on? >>> >>> Given that all the records are keyed for a given tenant and I would have >>> the mappings stored in state, is it possible that within the open() >>> function for this dynamic route to access the state and initialize the >>> client there? Or maybe there’s some other approach (such as grouping by >>> clusters and dynamically handling indices)? >>> >>> I’d be happy to give a shot at making the appropriate changes to the >>> sink as well, although I’m far from an Elastic expert. If you point me in >>> the right direction, I may be able to help out. >>> >>> Thanks much! >>> >>> Rion >>> >>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote: >>> >>> >>> Hi Rion, >>> >>> As you probably already know, for dynamic indices, you can simply >>> implement your own ElasticsearchSinkFunction >>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>> [1], where you can create any request that elastic client supports. >>> >>> The tricky part is how to implement dynamic routing into multiple >>> clusters. >>> - If the elastic clusters are known upfront (before submitting job), you >>> can easily create multiple elastic sinks and prepend them with a simple >>> filter (this is basically what split operator does). >>> - If you discover elastics clusters at runtime, this would require some >>> changes of the current ElasticsearchSink implementation. I think this may >>> be actually as simple as introducing something like >>> DynamicElasticsearchSink, that could dynamically create and managed "child" >>> sinks. This would probably require some thoughts about how to manage >>> consumed resources (memory), because number of child sink could be >>> potentially unbounded. This could be of course simplified if underlying >>> elastic client already supports that, which I'm not aware of. If you'd like >>> to take this path, it would definitely be a great contribution to Flink >>> (I'm able to provide some guidance). >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>> >>> Best, >>> D. >>> >>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com> >>> wrote: >>> >>>> Hi folks, >>>> >>>> I have a use-case that I wanted to initially pose to the mailing list >>>> as I’m not terribly familiar with the Elasticsearch connector to ensure I’m >>>> not going down the wrong path trying to accomplish this in Flink (or if >>>> something downstream might be a better option). >>>> >>>> Basically, I have the following pieces to the puzzle: >>>> >>>> - A stream of tenant-specific events >>>> - An HTTP endpoint containing mappings for tenant-specific Elastic >>>> cluster information (as each tenant has its own specific Elastic >>>> cluster/index) >>>> >>>> What I’m hoping to accomplish is the following: >>>> >>>> 1. One stream will periodically poll the HTTP endpoint and store >>>> these cluster mappings in state (keyed by tenant with cluster info as >>>> the >>>> value) >>>> 2. The event stream will be keyed by tenant and connected to the >>>> cluster mappings stream. >>>> 3. I’ll need to an Elasticsearch sink that can route the >>>> tenant-specific event data to its corresponding cluster/index from the >>>> mapping source. >>>> >>>> I know that the existing Elasticsearch sink supports dynamic indices, >>>> however I didn’t know if it’s possible to adjust the cluster like I would >>>> need on a per-tenant basis or if there’s a better approach here? >>>> >>>> Any advice would be appreciated. >>>> >>>> Thanks, >>>> >>>> Rion >>>> >>>