Thanks - I will wait for Stefan’s comments before I start digging in.

> On Jul 4, 2018, at 4:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ashish,
> 
> I think we don't want to make it an official public API (at least not at this 
> point), but maybe you can dig into the internal API and leverage it for your 
> use case.
> I'm not 100% sure about all the implications, that's why I pulled in Stefan 
> in this thread.
> 
> Best, Fabian
> 
> 2018-07-02 15:37 GMT+02:00 ashish pok <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>>:
> Thanks Fabian! It sounds like KeyGroup will do the trick if that can be made 
> publicly accessible.
> 
> On Monday, July 2, 2018, 5:43:33 AM EDT, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> 
> 
> Hi Ashish, hi Vijay,
> 
> Flink does not distinguish between different parts of a key (parent, child) 
> in the public APIs. However, there is an internal concept of KeyGroups which 
> is similar to what you call physical partitioning. A KeyGroup is a group of 
> keys that are always processed on the same physical node. The motivation for 
> this feature is operator scaling because all keys of a group are always 
> processed by the same node and hence their state is always distributed 
> together. However, AFAIK, KeyGroups are not exposed to the user API. 
> Moreover, KeyGroups are distributed to slots, i.e., each KeyGroup is 
> processed by a single slot, but each slot might processes multiple key 
> groups. This distribution is done with hash partitioning and hence hard to 
> tune.
> 
> There might be a way to tweak this by implementing an own low-level operator 
> but I'm not sure. Stefan (in CC) might be able  to give some hints.
> 
> Best, Fabian
> 
> 2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com 
> <mailto:bvija...@gmail.com>>:
> Thanks for the clarification, Fabian.
> This is what I compromised on for my use-case-doesn't exactly do what I 
> intended to do.
> Partition by a key, and then spawn threads inside that partition to do my 
> task and then finally repartition again(for a subsequent connect).
> 
> DataStream<CameraWithCube> keyedByCamCameraStream = env
>             .addSource(new Source(....))
>             .keyBy((cameraWithCube) -> cameraWithCube.getCam() );
> AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction =
>                 new SampleAsyncFunction(...., nThreads);//spawn threads here 
> with the second key ts here
>         DataStream<CameraWithCube> cameraWithCubeDataStreamAsync =
>                 AsyncDataStream.orderedWait( keyedByCamCameraStream, 
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads)
>                         .setParallelism( parallelCamTasks);//capacity= max # 
> of inflight requests - how much; timeout - max time until considered failed
>                         
>         DataStream<CameraWithCube> cameraWithCubeDataStream = 
> cameraWithCubeDataStreamAsync. keyBy((cameraWithCube) -> 
> cameraWithCube.getTs());                                                      
>  
>                       
> 
> 
> On Thu, Jun 28, 2018 at 9:22 AM ashish pok <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>> wrote:
> Fabian, All,
> 
> Along this same line, we have a datasource where we have parent key and child 
> key. We need to first keyBy parent and then by child. If we want to have 
> physical partitioning in a way where physical partiotioning happens first by 
> parent key and localize grouping by child key, is there a need to using 
> custom partitioner? Obviously we can keyBy twice but was wondering if we can 
> minimize the re-partition stress.
> 
> Thanks,
> 
> Ashish
> 
> 
> - Ashish
> 
> On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> 
> Hi Vijay,
> 
> Flink does not provide fine-grained control to place keys to certain slots or 
> machines. 
> When specifying a key, it is up to Flink (i.e., its internal hash function) 
> where the data is processed. This works well for large key spaces, but can be 
> difficult if you have only a few keys.
> 
> So, even if you keyBy(cam) and handle the parallelization of seq# internally 
> (which I would not recommend), it might still happen that the data of two 
> cameras is processed on the same slot.
> The only way to change that would be to fiddle with the hash of your keys, 
> but this might give you a completely different distribution when scaling out 
> the application at a later point in time.
> 
> Best, Fabian
> 2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com 
> <mailto:bvija...@gmail.com>>:
> Hi Fabian,
> Thanks once again for your reply. I need to get the data from each cam/camera 
> into 1 partition/slot and not move the gigantic video data around as much as 
> I perform other operations on it. For eg, I can get seq#1 and seq#2 for cam1 
> in cam1 partition/slot and then combine, split,parse, stitch etc. operations 
> on it in multiple threads within the same cam1 partition.
> 
> I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
> partition(eg: cam1). The idea is to then work within the cam1 partition with 
> various seq#'s 1,2 etc on various threads within the same slot/partition of 
> TaskManager.
> 
> The data is stored in EFS keyed based on seq#/cam# folder structure.
> 
> Our actual problem is managing network bandwidth as a resource in each 
> partition. We want to make sure that the processing of 1 camera(split into 
> multiple seq# tasks) is not running on the same node as the processing of 
> another camera as in that case, the required network bandwidth for storing 
> the output of the process running in the partition would exceed the network 
> bandwidth of the hardware. Camera processing is expected to run on the same 
> hardware as the video decode step which is an earlier sequential process in 
> the same Dataflow pipeline.
> 
> I guess I might have to use a ThreadPool within each Slot(cam partition) to 
> work on each seq# ??
> 
> TIA
> On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hi,
> 
> keyBy() does not work hierarchically. Each keyBy() overrides the previous 
> partitioning.
> You can keyBy(cam, seq#) which guarantees that all records with the same 
> (cam, seq#) are processed by the same parallel instance.
> However, Flink does not give any guarantees about how the (cam, seq#) 
> partitions are distributed across slots (or even physical nodes).
> 
> Btw. why is it important that all records of the same cam are processed by 
> the same physical node?
> 
> Fabian
> 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com 
> <mailto:bvija...@gmail.com>>:
> I see a .slotSharingGroup for SingleOutputStreamOperator  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
>   which can put parallel instances of operations in same TM slot.
> I also see a CoLocationGroup but do not see a .coLocationGroup for  
> SingleOutputStreamOperator to put a task on the same slot.Seems 
> CoLocationGroup 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
>  is defined at JobVertex level and has nothing to do with for  
> SingleOutputStreamOperator.
> TaskManager has many slots. Slots have many threads within it.
> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then use 
> a keyBy(seq#) to run on many threads within that cam1 slot.
> 
> Vijay
> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com 
> <mailto:bvija...@gmail.com>> wrote:
> Thanks, Fabian.
> Been reading your excellent book on Flink Streaming.Can't wait for more 
> chapters.
> Attached a pic.
> 
> 
> 
> I have records with seq# 1 and cam1 and cam2. I also have records with 
> varying seq#'s.
> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition on 
> the same task manager instance/slot/vCore(???)
> Can I then have seq# 1 and seq# 2 for cam1 partition run in different 
> slots/threads on the same Task Manager instance(aka cam1 partition) using 
> keyBy(seq#) & setParallelism() ? Can forward Strategy be used to achieve this 
> ?
> 
> TIA
> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Hi,
> 
> Flink distributes task instances to slots and does not expose physical 
> machines.
> Records are partitioned to task instances by hash partitioning. It is also 
> not possible to guarantee that the records in two different operators are 
> send to the same slot.
> Sharing information by side-passing it (e.g., via a file on a machine or in a 
> static object) is an anti-pattern and should be avoided.
> 
> Best, Fabian
> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com 
> <mailto:bvija...@gmail.com>>:
> Hi,
> 
> Need to partition by cameraWithCube.getCam() 1st using 
> parallelCamTasks(passed in as args).
> 
> Then within each partition, need to partition again by cameraWithCube.getTs() 
> but need to make sure each of the 2nd partition by getTS() runs on the same 
> physical node ?
> 
> How do I achieve that ?
> 
> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>             .addSource(new Source(....))
>             .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>             .process(new ProcessFunction< CameraWithCube, CameraWithCube>() {
>                 public void processElement(CameraWithCube cameraWithCube, 
> Context context, Collector<CameraWithCube> collector) throws Exception {
>                     //do nothing
>                 }
>             })
>             .slotSharingGroup(" camSharingGroup")//TODO: how to add camera# 
> of the partition
>             .setParallelism( parallelCamTasks)
>             .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>             .process(new ProcessFunction< CameraWithCube, CameraWithCube>() {
>                 public void processElement(CameraWithCube cameraWithCube, 
> Context context, Collector<CameraWithCube> collector) throws Exception {
>                     //TODO: process code
>                 }
>             })
>             .setParallelism( noOfSlotsInEachPhysicalNode)// TODO: how many 
> parallel tasks within physical node
>             .slotSharingGroup("??");// TODO: in same physical node
> TIA
> 
> 
> 
> 
> 
> 

Reply via email to