Hi Shanmmon,
Thank you for your quick response.
To give you some context, I am working on a project that involves
joining two streams and performing some left/inner join operations based
on certain keys. As for using batch mode, my intention is to have a
unified approach that works for both stream and batch processing.
If I decide not to use Flink's join/coGroup API, I would need to
implement a join operation manually by saving one stream's data and
reading it from the other stream. This could potentially make the
solution more complex, given the nature of this particular scenario.
Thank you for your time and I look forward to hearing from you soon.
Best,
Jiadong Lu
On 2023/4/26 18:13, Shammon FY wrote:
Hi Jiadong
Using the process time window in Batch jobs may be a little strange for
me. I prefer to partition the data according to the day level, and then
the Batch job reads data from different partitions instead of using Window.
Best,
Shammon FY
On Wed, Apr 26, 2023 at 12:03 PM Jiadong Lu <archzi...@gmail.com
<mailto:archzi...@gmail.com>> wrote:
Hi, Shammon,
Thank you for your reply.
Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also
need a `Trigger` like
`org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger`
to tigger all data for window process.
So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good
solution for this scenario. What do you think?
As for what you mentioned
> use join directly
I have no idea about using join without window. Would you mind
writing a
demo about it?
Your help is greatly appreciated in advance.
Best,
Jiadong Lu
On 2023/4/26 09:53, Shammon FY wrote:
> Hi Jiadong,
>
> I think it depends on the specific role of the window here for
you. If
> this window has no specific business meaning and is only used for
> performance optimization, maybe you can consider to use join directly
>
> Best,
> Shammon FY
>
> On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com
<mailto:archzi...@gmail.com>
> <mailto:archzi...@gmail.com <mailto:archzi...@gmail.com>>> wrote:
>
> Hello,everyone,
>
> I am confused about the window of join/coGroup operator in
Batch mode.
> Here is my demo code, and it works fine for me at present. I
wonder if
> this approach that using process time window in batch mode is
> appropriate? and does this approach have any problems? I want
to use
> this solution to solve my problem(join two stream in batch mode).
>
> ```java
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream<Integer> s1 =
env.fromCollection(Stream.of(1,
> 2, 3,
> 4, 5, 6, 7).collect(Collectors.toList()));
> DataStream<Integer> s2 =
env.fromCollection(Stream.of(6,
> 5, 4,
> 3, 2, 1).collect(Collectors.toList()));
>
> s1.coGroup(s2)
> .where(new KeySelector<Integer, Integer>() {
> @Override
> public Integer getKey(Integer value) throws
> Exception {
> return value;
> }
> })
> .equalTo(new KeySelector<Integer, Integer>() {
> @Override
> public Integer getKey(Integer value) throws
> Exception {
> return value;
> }
>
> }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
> .apply(new CoGroupFunction<Integer, Integer,
> Tuple2<Integer, Integer>>() {
> @Override
> public void coGroup(Iterable<Integer>
first,
> Iterable<Integer> second, Collector<Tuple2<Integer, Integer>>
out)
> throws Exception {
> if (!second.iterator().hasNext()) {
> for (Integer integer : first) {
> out.collect(new
Tuple2<>(integer,
> null));
> }
> } else {
> for (Integer integer : first) {
> for (Integer integer1 :
second) {
> out.collect(new
Tuple2<>(integer,
> integer1));
> }
> }
> }
> }
> }).printToErr();
> env.setParallelism(1);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.execute();
> }
> ```
>
> Thanks in advance.
>
> --
> Jiadong Lu
>