Hi Shanmmon,

Thank you for your quick response.

To give you some context, I am working on a project that involves joining two streams and performing some left/inner join operations based on certain keys. As for using batch mode, my intention is to have a unified approach that works for both stream and batch processing.

If I decide not to use Flink's join/coGroup API, I would need to implement a join operation manually by saving one stream's data and reading it from the other stream. This could potentially make the solution more complex, given the nature of this particular scenario.

Thank you for your time and I look forward to hearing from you soon.

Best,
Jiadong Lu

On 2023/4/26 18:13, Shammon FY wrote:
Hi Jiadong

Using the process time window in Batch jobs may be a little strange for me. I prefer to partition the data according to the day level, and then the Batch job reads data from different partitions instead of using Window.

Best,
Shammon FY

On Wed, Apr 26, 2023 at 12:03 PM Jiadong Lu <archzi...@gmail.com <mailto:archzi...@gmail.com>> wrote:

    Hi, Shammon,
    Thank you for your reply.

    Yes, the window configured with `Time.days(1)` has no special meaning,
    it is just used to group all data into the same global window.
    I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also
    need a `Trigger` like
    `org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger`
    to tigger all data for window process.

    So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good
    solution for this scenario. What do you think?

    As for what you mentioned
      > use join directly
    I have no idea about using join without window. Would you mind
    writing a
    demo about it?

    Your help is greatly appreciated in advance.

    Best,
    Jiadong Lu

    On 2023/4/26 09:53, Shammon FY wrote:
     > Hi Jiadong,
     >
     > I think it depends on the specific role of the window here for
    you. If
     > this window has no specific business meaning and is only used for
     > performance optimization, maybe you can consider to use join directly
     >
     > Best,
     > Shammon FY
     >
     > On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com
    <mailto:archzi...@gmail.com>
     > <mailto:archzi...@gmail.com <mailto:archzi...@gmail.com>>> wrote:
     >
     >     Hello,everyone,
     >
     >     I am confused about the window of join/coGroup operator in
    Batch mode.
     >     Here is my demo code, and it works fine for me at present. I
    wonder if
     >     this approach that using process time window in batch mode is
     >     appropriate? and does this approach have any problems? I want
    to use
     >     this solution to solve my problem(join two stream in batch mode).
     >
     >     ```java
     >     public static void main(String[] args) throws Exception {
     >
     >               StreamExecutionEnvironment env =
     >     StreamExecutionEnvironment.getExecutionEnvironment();
     >
     >               DataStream<Integer> s1 =
    env.fromCollection(Stream.of(1,
     >     2, 3,
     >     4, 5, 6, 7).collect(Collectors.toList()));
     >               DataStream<Integer> s2 =
    env.fromCollection(Stream.of(6,
     >     5, 4,
     >     3, 2, 1).collect(Collectors.toList()));
     >
     >               s1.coGroup(s2)
     >                       .where(new KeySelector<Integer, Integer>() {
     >                           @Override
     >                           public Integer getKey(Integer value) throws
     >     Exception {
     >                               return value;
     >                           }
     >                       })
     >                       .equalTo(new KeySelector<Integer, Integer>() {
     >                           @Override
     >                           public Integer getKey(Integer value) throws
     >     Exception {
     >                               return value;
     >                           }
     >
     >       }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
     >                       .apply(new CoGroupFunction<Integer, Integer,
     >     Tuple2<Integer, Integer>>() {
     >                           @Override
     >                           public void coGroup(Iterable<Integer>
    first,
     >     Iterable<Integer> second, Collector<Tuple2<Integer, Integer>>
    out)
     >     throws Exception {
     >                               if (!second.iterator().hasNext()) {
     >                                   for (Integer integer : first) {
     >                                       out.collect(new
    Tuple2<>(integer,
     >     null));
     >                                   }
     >                               } else {
     >                                   for (Integer integer : first) {
     >                                       for (Integer integer1 :
    second) {
     >                                           out.collect(new
    Tuple2<>(integer,
     >     integer1));
     >                                       }
     >                                   }
     >                               }
     >                           }
     >                       }).printToErr();
     >               env.setParallelism(1);
     >               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
     >               env.execute();
     >           }
     >     ```
     >
     >     Thanks in advance.
     >
     >     --
     >     Jiadong Lu
     >

Reply via email to