Just some advice - do not use sleep to simulate a heavy task. Use real data or generated data to simulate. This sleep is garbage from a software quality point of view. Furthermore, it is often forgotten etc.
> On 16. May 2018, at 22:32, Vijay Balakrishnan <[email protected]> wrote: > > Hi, > Newbie question - What I am trying to do is the following: > CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS). > 1. Need to partition data by cameraNbr. > 2. Then sleep for 1 sec to simulate a heavy process in the task. > 3. Then need to partition data by TS and finally get the DataStream to > connect with another DataStream. > > DataStream<CameraWithCube> cameraWithCubeDataStream = env > .addSource(new CameraWithCubeSource(cameraFile, delay, > servingSpeedFactor)) > .setParallelism(parallelTasks) > .setMaxParallelism(parallelTasks) > .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? > //partition by cameraNbr > cameraWithCube.cameraKey.getCam() : new Object()); > //sleep for 1 sec ???? how > ((KeyedStream) > cameraWithCubeDataStream).timeWindow(Time.seconds(1)) > .apply(new WindowFunction<CameraWithCube, CameraWithCube, > String, TimeWindow>() { > @Override > public void apply(String cameraKeyCam, TimeWindow > timeWindow, > Iterable<CameraWithCube> > cameraWithCubesAssignedToWindow, > Collector<CameraWithCube> > collector) throws Exception { > Thread.sleep(1000); > > cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> > collector.collect(cameraWithCube)); > > } > })//returning void here from apply ?? > //partition by TS and return DataStream > .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != > null ? //partition by cameraNbr > cameraWithCube.cameraKey.getTS() : new > Object()); > ; > > TIA, > Vijay
