Hi Julien, I'd run into a similar situation, where I need to have a keyed stream, but I want (effectively) one key per task.
It’s possible to generate keys that will get distributed as you need, though it does require making assumptions about how Flink generates hashes/key groups. And once you start talking about state, then it gets a bit harder, as you need to know the max parallelism, which is used to calculate “key groups”. Below is a cheesy function I wrote to make an Integer that (if used as the key) will partition the record to the target operator. I use it in a custom Map function to add a key field. — Ken /** * Return an integer value that will get partitioned to the target <operatorIndex>, given the * workflow's <maxParallelism> (for key groups) and the operator <parallelism>. * * @param maxParallelism * @param parallelism * @param operatorIndex * @return Integer suitable for use in a record as the key. */ public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism, int operatorIndex) { if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) { maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); } for (int i = 0; i < maxParallelism * 2; i++) { Integer key = new Integer(i); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism); int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup); if (index == operatorIndex) { return key; } } throw new RuntimeException(String.format("Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d", operatorIndex, maxParallelism, parallelism)); } > On Feb 19, 2018, at 12:34 AM, Julien <jmassio...@gmail.com> wrote: > > Hello, > > I've already tried to key my stream with "resourceId.hashCode%parallelism" > (with parallelism of 4 in my example). > So all my keys will be either 0,1, 2 or 3. I can then benefit from a time > window on this keyed stream and do only 4 queries to my external system. > But it is not well distributed with the default partitioner on keyed stream. > (keys 0, 1, 2 and 3 only goes to operator idx 2, 3). > > I think I should explore the customer partitioner, as you suggested Xingcan. > Maybe my last question on this will be: "can you give me more details on this > point "and simulate a window operation by yourself in a ProcessFunction" ? > > When I look at the documentation about the custom partitioner, I can see that > the result of partitionCustom is a DataStream. > It is not a KeyedStream. > So the only window I have will be windowAll (which will bring me back to a > parallelism of 1, no ?). > > And if I do something like "myStream.partitionCustom(<my new partitioner>,<my > key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ? > When looking at the "KeyedStream" class, it seems that it will go back to the > "KeyGroupStreamPartitioner" and forget my custom partitioner ? > > Thanks again for your feedback, > > Julien. > > > On 19/02/2018 03:45, 周思华 wrote: >> Hi Julien, >> If I am not misunderstand, I think you can key your stream on a >> `Random.nextInt() % parallesm`, this way you can "group" together alerts >> from different and benefit from multi parallems. >> >> >> 发自网易邮箱大师 >> >> On 02/19/2018 09:08,Xingcan Cui<xingc...@gmail.com> >> <mailto:xingc...@gmail.com> wrote: >> Hi Julien, >> >> sorry for my misunderstanding before. For now, the window can only be >> defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. >> I’d like to provide three options for your scenario. >> >> 1. If your external data is static and can be fit into the memory, you can >> use ManagedStates >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> >> to cache them without considering the querying problem. >> 2. Or you can use a CustomPartitioner >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> >> to manually distribute your alert data and simulate an window operation by >> yourself in a ProcessFuncton. >> 3. You may also choose to use some external systems such as in-memory store, >> which can work as a cache for your queries. >> >> Best, >> Xingcan >> >>> On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com >>> <mailto:jmassio...@gmail.com>> wrote: >>> >>> Hi Xingcan, >>> >>> Thanks for your answer. >>> Yes, I understand that point: >>> if I have 100 resource IDs with parallelism of 4, then each operator >>> instance will handle about 25 keys >>> >>> >>> The issue I have is that I want, on a given operator instance, to group >>> those 25 keys together in order to do only 1 query to an external system >>> per operator instance: >>> >>> on a given operator instance, I will do 1 query for my 25 keys >>> so with the 4 operator instances, I will do 4 query in parallel (with about >>> 25 keys per query) >>> >>> I do not know how I can do that. >>> >>> If I define a window on my keyed stream (with for example >>> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), >>> then my understanding is that the window is "associated" to the key. So in >>> this case, on a given operator instance, I will have 25 of those windows >>> (one per key), and I will do 25 queries (instead of 1). >>> >>> Do you understand my point ? >>> Or maybe am I missing something ? >>> >>> I'd like to find a way on operator instance 1 to group all the alerts >>> received on those 25 resource ids and do 1 query for those 25 resource ids. >>> Same thing for operator instance 2, 3 and 4. >>> >>> >>> Thank you, >>> Regards. >>> >>> >>> On 18/02/2018 14:43, Xingcan Cui wrote: >>>> Hi Julien, >>>> >>>> the cardinality of your keys (e.g., resource ID) will not be restricted to >>>> the parallelism. For instance, if you have 100 resource IDs processed by >>>> KeyedStream with parallelism 4, each operator instance will handle about >>>> 25 keys. >>>> >>>> Hope that helps. >>>> >>>> Best, >>>> Xingcan >>>> >>>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com >>>>> <mailto:jmassio...@gmail.com>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I am pretty new to flink and I don't know what will be the best way to >>>>> deal with the following use case: >>>>> >>>>> as an input, I recieve some alerts from a kafka topic >>>>> an alert is linked to a network resource (like router-1, router-2, >>>>> switch-1, switch-2, ...) >>>>> so an alert has two main information (the alert id and the resource id of >>>>> the resource on which this alert has been raised) >>>>> then I need to do a query to an external system in order to enrich the >>>>> alert with additional information on the resource >>>>> >>>>> (A "natural" candidate for the key on this stream will be the resource id) >>>>> >>>>> The issue I have is that regarding the query to the external system: >>>>> I do not want to do 1 query per resource id >>>>> I want to do a small number of queries in parallel (for example 4 queries >>>>> in parallel every 500ms), each query requesting the external system for >>>>> several alerts linked to several resource id >>>>> Currently, I don't know what will be the best way to deal with that: >>>>> I can key my stream on the resource id and then define a processing time >>>>> window of 500ms and when the trigger is ok, then I do my query >>>>> by doing so, I will "group" several alerts in a single query, but they >>>>> will all be linked to the same resource. >>>>> so I will do 1 query per resource id (which will be too much in my use >>>>> case) >>>>> I can also do a windowAll on a non keyed stream >>>>> by doing so, I will "group" together alerts from different resource ids, >>>>> but from what I've read in such a case the parallelism will always be one. >>>>> so in this case, I will only do 1 query whereas I'd like to have some >>>>> parallelism >>>>> I am thinking that a way to deal with that will be: >>>>> >>>>> define the resource id as the key of stream and put a parallelism of 4 >>>>> and then having a way to do a windowAll on this keyed stream >>>>> which is that, on a given operator instance, I will "group" on the same >>>>> window all the keys (ie all the resource ids) managed by this operator >>>>> instance >>>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator >>>>> instance, and each query will be for several alerts linked to several >>>>> resource ids) >>>>> But after looking at the documentation, I cannot see this ability (having >>>>> a windowAll on a keyed stream). >>>>> >>>>> Am I missing something? >>>>> >>>>> What will be the best way to deal with such a use case? >>>>> >>>>> >>>>> >>>>> I've tried for example to review my key and to do something like >>>>> "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a >>>>> time window. >>>>> >>>>> In my example above, the <max nb of queries in parallel> will be 4. And >>>>> all my keys will be 0, 1, 2 or 3. >>>>> >>>>> The issue with this approach is that due to the way the operatorIdx is >>>>> computed based on the key, it does not distribute well my processing: >>>>> >>>>> when this partitioning logic from the "KeyGroupRangeAssignment" class is >>>>> applied >>>>> /** >>>>> * Assigns the given key to a parallel operator index. >>>>> * >>>>> * @param key the key to assign >>>>> * @param maxParallelism the maximum supported parallelism, aka the >>>>> number of key-groups. >>>>> * @param parallelism the current parallelism of the operator >>>>> * @return the index of the parallel operator to which the given key >>>>> should be routed. >>>>> */ >>>>> public static int assignKeyToParallelOperator(Object key, int >>>>> maxParallelism, int parallelism) { >>>>> return computeOperatorIndexForKeyGroup(maxParallelism, >>>>> parallelism, assignToKeyGroup(key, maxParallelism)); >>>>> } >>>>> >>>>> /** >>>>> * Assigns the given key to a key-group index. >>>>> * >>>>> * @param key the key to assign >>>>> * @param maxParallelism the maximum supported parallelism, aka the >>>>> number of key-groups. >>>>> * @return the key-group to which the given key is assigned >>>>> */ >>>>> public static int assignToKeyGroup(Object key, int maxParallelism) { >>>>> return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); >>>>> } >>>>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 >>>>> operators will not have anything to do) >>>>> >>>>> >>>>> So, what will be the best way to deal with that? >>>>> >>>>> >>>>> >>>>> Thank you in advance for your support. >>>>> -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr