I see a .slotSharingGroup for SingleOutputStreamOperator <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String-> which can put parallel instances of operations in same TM slot. I also see a CoLocationGroup but do not see a .coLocationGroup for SingleOutputStreamOperator to put a task on the same slot.Seems CoLocationGroup <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html> is defined at JobVertex level and has nothing to do with for SingleOutputStreamOperator. TaskManager has many slots. Slots have many threads within it. I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then use a keyBy(seq#) to run on many threads within that cam1 slot.
Vijay On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Thanks, Fabian. > Been reading your excellent book on Flink Streaming.Can't wait for more > chapters. > Attached a pic. > > [image: partition-by-cam-ts.jpg] > > I have records with seq# 1 and cam1 and cam2. I also have records with > varying seq#'s. > By partitioning on cam field first(keyBy(cam)), I can get cam1 partition > on the same task manager instance/slot/vCore(???) > Can I then have seq# 1 and seq# 2 for cam1 partition run in different > slots/threads on the same Task Manager instance(aka cam1 partition) using > keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to > achieve this ? > > TIA > > > On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Flink distributes task instances to slots and does not expose physical >> machines. >> Records are partitioned to task instances by hash partitioning. It is >> also not possible to guarantee that the records in two different operators >> are send to the same slot. >> Sharing information by side-passing it (e.g., via a file on a machine or >> in a static object) is an anti-pattern and should be avoided. >> >> Best, Fabian >> >> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>: >> >>> Hi, >>> >>> Need to partition by cameraWithCube.getCam() 1st using >>> parallelCamTasks(passed in as args). >>> >>> Then within each partition, need to partition again by >>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by >>> getTS() runs on the same physical node ? >>> >>> How do I achieve that ? >>> >>> DataStream<CameraWithCube> cameraWithCubeDataStream = env >>> .addSource(new Source(....)) >>> .keyBy((cameraWithCube) -> cameraWithCube.getCam() ) >>> .process(new ProcessFunction<CameraWithCube, CameraWithCube>() { >>> public void processElement(CameraWithCube cameraWithCube, >>> Context context, Collector<CameraWithCube> collector) throws Exception { >>> //do nothing >>> } >>> }) >>> .slotSharingGroup("camSharingGroup")//TODO: how to add camera# >>> of the partition >>> .setParallelism(parallelCamTasks) >>> .keyBy((cameraWithCube) -> cameraWithCube.getTs()) >>> .process(new ProcessFunction<CameraWithCube, CameraWithCube>() { >>> public void processElement(CameraWithCube cameraWithCube, >>> Context context, Collector<CameraWithCube> collector) throws Exception { >>> //TODO: process code >>> } >>> }) >>> .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many >>> parallel tasks within physical node >>> .slotSharingGroup("??");//TODO: in same physical node >>> >>> TIA >>> >> >>