Hi Julien, sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario.
1. If your external data is static and can be fit into the memory, you can use ManagedStates <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to cache them without considering the querying problem. 2. Or you can use a CustomPartitioner <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton. 3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries. Best, Xingcan > On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com> wrote: > > Hi Xingcan, > > Thanks for your answer. > Yes, I understand that point: > if I have 100 resource IDs with parallelism of 4, then each operator instance > will handle about 25 keys > > The issue I have is that I want, on a given operator instance, to group those > 25 keys together in order to do only 1 query to an external system per > operator instance: > > on a given operator instance, I will do 1 query for my 25 keys > so with the 4 operator instances, I will do 4 query in parallel (with about > 25 keys per query) > > I do not know how I can do that. > > If I define a window on my keyed stream (with for example > stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), > then my understanding is that the window is "associated" to the key. So in > this case, on a given operator instance, I will have 25 of those windows (one > per key), and I will do 25 queries (instead of 1). > > Do you understand my point ? > Or maybe am I missing something ? > > I'd like to find a way on operator instance 1 to group all the alerts > received on those 25 resource ids and do 1 query for those 25 resource ids. > Same thing for operator instance 2, 3 and 4. > > > Thank you, > Regards. > > > On 18/02/2018 14:43, Xingcan Cui wrote: >> Hi Julien, >> >> the cardinality of your keys (e.g., resource ID) will not be restricted to >> the parallelism. For instance, if you have 100 resource IDs processed by >> KeyedStream with parallelism 4, each operator instance will handle about 25 >> keys. >> >> Hope that helps. >> >> Best, >> Xingcan >> >>> On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com >>> <mailto:jmassio...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I am pretty new to flink and I don't know what will be the best way to deal >>> with the following use case: >>> >>> as an input, I recieve some alerts from a kafka topic >>> an alert is linked to a network resource (like router-1, router-2, >>> switch-1, switch-2, ...) >>> so an alert has two main information (the alert id and the resource id of >>> the resource on which this alert has been raised) >>> then I need to do a query to an external system in order to enrich the >>> alert with additional information on the resource >>> >>> (A "natural" candidate for the key on this stream will be the resource id) >>> >>> The issue I have is that regarding the query to the external system: >>> I do not want to do 1 query per resource id >>> I want to do a small number of queries in parallel (for example 4 queries >>> in parallel every 500ms), each query requesting the external system for >>> several alerts linked to several resource id >>> Currently, I don't know what will be the best way to deal with that: >>> I can key my stream on the resource id and then define a processing time >>> window of 500ms and when the trigger is ok, then I do my query >>> by doing so, I will "group" several alerts in a single query, but they will >>> all be linked to the same resource. >>> so I will do 1 query per resource id (which will be too much in my use case) >>> I can also do a windowAll on a non keyed stream >>> by doing so, I will "group" together alerts from different resource ids, >>> but from what I've read in such a case the parallelism will always be one. >>> so in this case, I will only do 1 query whereas I'd like to have some >>> parallelism >>> I am thinking that a way to deal with that will be: >>> >>> define the resource id as the key of stream and put a parallelism of 4 >>> and then having a way to do a windowAll on this keyed stream >>> which is that, on a given operator instance, I will "group" on the same >>> window all the keys (ie all the resource ids) managed by this operator >>> instance >>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator >>> instance, and each query will be for several alerts linked to several >>> resource ids) >>> But after looking at the documentation, I cannot see this ability (having a >>> windowAll on a keyed stream). >>> >>> Am I missing something? >>> >>> What will be the best way to deal with such a use case? >>> >>> >>> I've tried for example to review my key and to do something like >>> "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a >>> time window. >>> >>> In my example above, the <max nb of queries in parallel> will be 4. And all >>> my keys will be 0, 1, 2 or 3. >>> >>> The issue with this approach is that due to the way the operatorIdx is >>> computed based on the key, it does not distribute well my processing: >>> >>> when this partitioning logic from the "KeyGroupRangeAssignment" class is >>> applied >>> /** >>> * Assigns the given key to a parallel operator index. >>> * >>> * @param key the key to assign >>> * @param maxParallelism the maximum supported parallelism, aka the >>> number of key-groups. >>> * @param parallelism the current parallelism of the operator >>> * @return the index of the parallel operator to which the given key >>> should be routed. >>> */ >>> public static int assignKeyToParallelOperator(Object key, int >>> maxParallelism, int parallelism) { >>> return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, >>> assignToKeyGroup(key, maxParallelism)); >>> } >>> >>> /** >>> * Assigns the given key to a key-group index. >>> * >>> * @param key the key to assign >>> * @param maxParallelism the maximum supported parallelism, aka the >>> number of key-groups. >>> * @return the key-group to which the given key is assigned >>> */ >>> public static int assignToKeyGroup(Object key, int maxParallelism) { >>> return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); >>> } >>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 >>> operators will not have anything to do) >>> >>> So, what will be the best way to deal with that? >>> >>> >>> >>> Thank you in advance for your support. >>> >>> Regards. >>> >>> >>> >>> Julien. >>> >>> >> >