Hi Jiadong, >From the context you described, I think ProcessingTimeWindow may not be a good solution. If I understand correctly, you'd like to use the same SQL for streaming and batch jobs in your platform. How about creating partitioned Sink tables for streaming jobs instead of Window? Then the streaming jobs can produce data to different partitions.
Best, Shammon FY On Wed, Apr 26, 2023 at 7:22 PM Jiadong Lu <archzi...@gmail.com> wrote: > Hi Shanmmon, > > Thank you for your quick response. > > To give you some context, I am working on a project that involves > joining two streams and performing some left/inner join operations based > on certain keys. As for using batch mode, my intention is to have a > unified approach that works for both stream and batch processing. > > If I decide not to use Flink's join/coGroup API, I would need to > implement a join operation manually by saving one stream's data and > reading it from the other stream. This could potentially make the > solution more complex, given the nature of this particular scenario. > > Thank you for your time and I look forward to hearing from you soon. > > Best, > Jiadong Lu > > On 2023/4/26 18:13, Shammon FY wrote: > > Hi Jiadong > > > > Using the process time window in Batch jobs may be a little strange for > > me. I prefer to partition the data according to the day level, and then > > the Batch job reads data from different partitions instead of > using Window. > > > > Best, > > Shammon FY > > > > On Wed, Apr 26, 2023 at 12:03 PM Jiadong Lu <archzi...@gmail.com > > <mailto:archzi...@gmail.com>> wrote: > > > > Hi, Shammon, > > Thank you for your reply. > > > > Yes, the window configured with `Time.days(1)` has no special > meaning, > > it is just used to group all data into the same global window. > > I tried using `GlobalWindow` for this scenario, but `GlobalWindow` > also > > need a `Trigger` like > > > `org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger` > > to tigger all data for window process. > > > > So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good > > solution for this scenario. What do you think? > > > > As for what you mentioned > > > use join directly > > I have no idea about using join without window. Would you mind > > writing a > > demo about it? > > > > Your help is greatly appreciated in advance. > > > > Best, > > Jiadong Lu > > > > On 2023/4/26 09:53, Shammon FY wrote: > > > Hi Jiadong, > > > > > > I think it depends on the specific role of the window here for > > you. If > > > this window has no specific business meaning and is only used for > > > performance optimization, maybe you can consider to use join > directly > > > > > > Best, > > > Shammon FY > > > > > > On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com > > <mailto:archzi...@gmail.com> > > > <mailto:archzi...@gmail.com <mailto:archzi...@gmail.com>>> wrote: > > > > > > Hello,everyone, > > > > > > I am confused about the window of join/coGroup operator in > > Batch mode. > > > Here is my demo code, and it works fine for me at present. I > > wonder if > > > this approach that using process time window in batch mode is > > > appropriate? and does this approach have any problems? I want > > to use > > > this solution to solve my problem(join two stream in batch > mode). > > > > > > ```java > > > public static void main(String[] args) throws Exception { > > > > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > DataStream<Integer> s1 = > > env.fromCollection(Stream.of(1, > > > 2, 3, > > > 4, 5, 6, 7).collect(Collectors.toList())); > > > DataStream<Integer> s2 = > > env.fromCollection(Stream.of(6, > > > 5, 4, > > > 3, 2, 1).collect(Collectors.toList())); > > > > > > s1.coGroup(s2) > > > .where(new KeySelector<Integer, Integer>() { > > > @Override > > > public Integer getKey(Integer value) > throws > > > Exception { > > > return value; > > > } > > > }) > > > .equalTo(new KeySelector<Integer, > Integer>() { > > > @Override > > > public Integer getKey(Integer value) > throws > > > Exception { > > > return value; > > > } > > > > > > }).window(TumblingProcessingTimeWindows.of(Time.days(1))) > > > .apply(new CoGroupFunction<Integer, Integer, > > > Tuple2<Integer, Integer>>() { > > > @Override > > > public void coGroup(Iterable<Integer> > > first, > > > Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> > > out) > > > throws Exception { > > > if (!second.iterator().hasNext()) { > > > for (Integer integer : first) { > > > out.collect(new > > Tuple2<>(integer, > > > null)); > > > } > > > } else { > > > for (Integer integer : first) { > > > for (Integer integer1 : > > second) { > > > out.collect(new > > Tuple2<>(integer, > > > integer1)); > > > } > > > } > > > } > > > } > > > }).printToErr(); > > > env.setParallelism(1); > > > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > > > env.execute(); > > > } > > > ``` > > > > > > Thanks in advance. > > > > > > -- > > > Jiadong Lu > > > > > >