Hi, keyBy() does not work hierarchically. Each keyBy() overrides the previous partitioning. You can keyBy(cam, seq#) which guarantees that all records with the same (cam, seq#) are processed by the same parallel instance. However, Flink does not give any guarantees about how the (cam, seq#) partitions are distributed across slots (or even physical nodes).
Btw. why is it important that all records of the same cam are processed by the same physical node? Fabian 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>: > I see a .slotSharingGroup for SingleOutputStreamOperator > <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String-> > which can put parallel instances of operations in same TM slot. > I also see a CoLocationGroup but do not see a .coLocationGroup for > SingleOutputStreamOperator to > put a task on the same slot.Seems CoLocationGroup > <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html> > is defined at JobVertex level and has nothing to do with for > SingleOutputStreamOperator. > TaskManager has many slots. Slots have many threads within it. > I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then > use a keyBy(seq#) to run on many threads within that cam1 slot. > > Vijay > > On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Thanks, Fabian. >> Been reading your excellent book on Flink Streaming.Can't wait for more >> chapters. >> Attached a pic. >> >> [image: partition-by-cam-ts.jpg] >> >> I have records with seq# 1 and cam1 and cam2. I also have records with >> varying seq#'s. >> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition >> on the same task manager instance/slot/vCore(???) >> Can I then have seq# 1 and seq# 2 for cam1 partition run in different >> slots/threads on the same Task Manager instance(aka cam1 partition) using >> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to >> achieve this ? >> >> TIA >> >> >> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Flink distributes task instances to slots and does not expose physical >>> machines. >>> Records are partitioned to task instances by hash partitioning. It is >>> also not possible to guarantee that the records in two different operators >>> are send to the same slot. >>> Sharing information by side-passing it (e.g., via a file on a machine or >>> in a static object) is an anti-pattern and should be avoided. >>> >>> Best, Fabian >>> >>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>: >>> >>>> Hi, >>>> >>>> Need to partition by cameraWithCube.getCam() 1st using >>>> parallelCamTasks(passed in as args). >>>> >>>> Then within each partition, need to partition again by >>>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by >>>> getTS() runs on the same physical node ? >>>> >>>> How do I achieve that ? >>>> >>>> DataStream<CameraWithCube> cameraWithCubeDataStream = env >>>> .addSource(new Source(....)) >>>> .keyBy((cameraWithCube) -> cameraWithCube.getCam() ) >>>> .process(new ProcessFunction<CameraWithCube, CameraWithCube>() >>>> { >>>> public void processElement(CameraWithCube cameraWithCube, >>>> Context context, Collector<CameraWithCube> collector) throws Exception { >>>> //do nothing >>>> } >>>> }) >>>> .slotSharingGroup("camSharingGroup")//TODO: how to add camera# >>>> of the partition >>>> .setParallelism(parallelCamTasks) >>>> .keyBy((cameraWithCube) -> cameraWithCube.getTs()) >>>> .process(new ProcessFunction<CameraWithCube, CameraWithCube>() >>>> { >>>> public void processElement(CameraWithCube cameraWithCube, >>>> Context context, Collector<CameraWithCube> collector) throws Exception { >>>> //TODO: process code >>>> } >>>> }) >>>> .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many >>>> parallel tasks within physical node >>>> .slotSharingGroup("??");//TODO: in same physical node >>>> >>>> TIA >>>> >>> >>>