AFAIK, FLINK-10886 is not implemented yet.
cc @Becket may know more plans about this feature.

Best,
Jark

On Sat, 21 Nov 2020 at 03:46, <fuyao...@oracle.com> wrote:

> Hi Timo,
>
> One more question, the blog also mentioned a jira task to solve this
> issue. https://issues.apache.org/jira/browse/FLINK-10886.  Will this
> feature be available in 1.12? Thanks!
>
> Best,
>
> Fuyao
> On 11/20/20 11:37, fuyao...@oracle.com wrote:
>
> Hi Timo,
>
> Thanks for your reply! I think your suggestions is really helpful! The
> good news is that I had managed to figure out it something by myself few
> days ago.
>
> 1. Thanks for the update about the table parallelism issue!
>
> 2. After trying out the idleness setting. It prevents some idle subtasks
> from blocking the pipeline's overall watermark and it works for me. Based
> on my observation and reading the source code, I have summarized some
> notes. Please correct me if I am wrong.
>
>    1. (1)Watermark is independent within each subtask for an Flink
>    operator.
>    2. (2)The watermark of the multi-parallelism table operator is always
>    dominated by least watermark of the current *ACTIVE* subtasks.
>    3. (3)With withIdleness() configured. A subtask will be mark as idle
>    if it hasn’t receive message for configured period of time. It will NOT
>    execute onPeriodEmit() and emit watermark after reaching the idle state.
>    Between [the start of the application/receive a new message]  and [reaching
>    into the idle state], the onPeriodEmit() will still emit watermark and
>    dominate the overall context watermark if it holds the smallest watermark
>    among the subtasks.
>    4. (4)Once an idle subtask receive a new message, it will switch its
>    status from idle to active and start to influence the overall context
>    watermark.
>
> 3. In order to route the correct information to the subtask in the join
> step, I have added the keyed() logic in the source based on the join key in
> the join step. It seems to work correctly and could route the message to a
> current place.
>
> 4. For the interval join, I think I can't use it directly since I need to
> use full outer join to not lose any information from any upstream
> datastream. I think interval join is a inner join it can't do this task. I
> guess my only option is to do full outer join with query configuration.
>
> 5. One more question about the data replay issue. I read the ververica
> blog (
> https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers)
> and I think with replay use case, we will face some similar issues. I think
> the suggested approach mentioned
>
>   (1). Puts each incoming track record in a map keyed by its timestamp
>
>   (2). creates an event timer to process that record once the watermark
> hits that point.
>
> I kind of understand the idea here. Buffer all the data(maybe delete some
> of the old track if processed) in a track ordered by timestamp and trigger
> the event timer sequentially with this buffered track.
>
> Based on my understanding, this buffered design is only suitable for
> *offline* data processing, right? (It is a waste of resource to buffer
> this in real time. )
>
> Also, from the article, I think they are using periodic watermark
> strategy[1]. how can they process the last piece of data records with
> periodic watermark strategy since there is no more incoming data to advance
> the watermark? So the last piece of data will never be processed here? Is
> there a way to gracefully handle this? My use case doesn't allow me to lose
> any information.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>
> Best,
>
> Fuyao
>
>
> On 11/20/20 08:55, Timo Walther wrote:
>
> Hi Fuyao,
>
> sorry for not replying earlier.
>
> You posted a lot of questions. I scanned the thread quickly, let me try to
> answer some of them and feel free to ask further questions afterwards.
>
> "is it possible to configure the parallelism for Table operation at
> operator level"
>
> No this is not possible at the moment. The reason is 1) we don't know how
> to expose such a functionality in a nice way. Maybe we will use SQL hints
> in the future [1]. 2) Sometime the planner sets the paralellism of
> operators explicitly to 1. All other operators will use the globally
> defined parallelism for the pipeline (also to not mess up retraction
> messages internally). You will be able to set the parallelism of the sink
> operation in Flink 1.12.
>
> "BoundedOutOfOrderness Watermark Generator is NOT making the event time to
> advance"
>
> Have you checked if you can use an interval join instead of a full join
> with state retention? Table/SQL pipelines that don't preserve a time
> attribute in the end might also erase the underlying watermarks. Thus,
> event time triggers will not work after your join.
>
> "Why can't I update the watermarks for all 8 parallelisms?"
>
> You could play around with idleness for your source [2]. Or you set the
> source parallelism to 1 (while keeping the rest of the pipeline globally
> set to 8), would that be an option?
>
> "Some type cast behavior of retracted streams I can't explain."
>
> toAppendStream/toRetractStream still need an update to the new type
> system. This is explained in FLIP-136 which will be part of Flink 1.13 [3].
>
> I hope I could help a bit.
>
> Regards,
> Timo
>
>
> [1]
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$
> [2]
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$
> [3]
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$
> On 13.11.20 21:39, Fuyao Li wrote:
>
> Hi Matthias,
>
> Just to provide more context on this problem. I only have 1 partition per
> each Kafka Topic at the beginning before the join operation. After reading
> the doc:
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$>
>
> Maybe that is the root cause of my problem here, with less than 8
> partitions (only 1 partition in my case), using the default parallelism of
> 8 will cause this wrong behavior. This is my guess, it takes a while to
> test it out... What's your opinion on this? Thanks!
>
> Best,
>
> Fuyao
>
>
> On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <fuyaoli2...@gmail.com
> <mailto:fuyaoli2...@gmail.com> <fuyaoli2...@gmail.com>> wrote:
>
>     Hi Matthias,
>
>     One more question regarding Flink table parallelism, is it possible
>     to configure the parallelism for Table operation at operator level,
>     it seems we don't have such API available, right? Thanks!
>
>     Best,
>     Fuyao
>
>     On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li <fuyaoli2...@gmail.com
>     <mailto:fuyaoli2...@gmail.com> <fuyaoli2...@gmail.com>> wrote:
>
>         Hi Matthias,
>
>         Thanks for your information. I have managed to figure out the
>         first issue you mentioned. Regarding the second issue. I have
>         got some progress on it.
>
>         I have sent another email with the title 'BoundedOutOfOrderness
>         Watermark Generator is NOT making the event time to advance'
>         using another email of mine, fuyao...@oracle.com
>         <mailto:fuyao...@oracle.com> <fuyao...@oracle.com>. That email
> contains some more
>         context on my issue. Please take a look. I have made some
>         progress after sending that new email.
>
>         Previously, I had managed to make timelag watermark strategy
>         working in my code, but my bound out of orderness strategy or
>         punctuated watermark strategy doesn't work well. It produces 8
>         watermarks each time. Two cycles are shown below.
>
>         I managed to figure out the root cause is that Flink stream
>         execution environment has a default parallelism as 8.*I didn't
>         notice in the doc, could the Community add this explicitly into
>         the official doc to avoid some confusion? Thanks.*
>
>          From my understanding, the watermark advances based on the
>         lowest watermark among the 8, so I can not advance the bound out
>         of orderness watermark since I am only advancing 1 of the 8
>         parallelisms. If I set the entire stream execution environment
>         to be of parallelism 1, it will reflect the watermark in the
>         context correctly. One more thing is that this behavior is not
>         reflected in the Flink Cluster web UI interface. I can see the
>         watermark is advancing, but it is not in reality. *That's
>         causing the inconsistency problem I mentioned in the other email
>         I mentioned above. Will this be considered as a bug in the UI?*
>
>         My current question is, since I have full outer join operation
>         before the KeyedProcessFunction here. How can I let the bound of
>         orderness watermark / punctuated watermark strategy work if the
>         parallelism > 1? It can only update one of the 8 parallelisms
>         for the watermark for this onTimer operator. Is this related to
>         my Table full outer join operation before this step? According
>         to the doc,
>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
>         
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$>
>
>         Default parallelism should be the same like the stream
>         environment. Why can't I update the watermarks for all 8
>         parallelisms? What should I do to enable this function with
>         Parallelism larger than 1? Thanks.
>
>         First round: (Note the first column of each log row is the
>         timelag strategy, it is getting updated correctly for all 8
>         parallelism, but the other two strategies I mentioned above
>         can't do that..)
>
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266199,
>         periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>         1605047187881 (only one of the 8 parallelism for bound out of
>         orderness is getting my new watermark)
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266199,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:01,199 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047266198,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>
>         Second round: (I set the autoWatermark interval to be 5 seconds)
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>         1605047187881
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>         14:28:06,200 INFO
>
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
>         - Emit Watermark: watermark based on system time: 1605047271200,
>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>
>
>         Best regards,
>
>         Fuyao
>
>
>         On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl
>         <matth...@ververica.com <mailto:matth...@ververica.com>
> <matth...@ververica.com>> wrote:
>
>             Hi Fuyao,
>             for your first question about the different behavior
>             depending on whether you chain the methods or not: Keep in
>             mind that you have to save the return value of the
>             assignTimestampsAndWatermarks method call if you don't chain
>             the methods together as it is also shown in [1].
>             At least the following example from your first message is
>             indicating it:
>             ```
>             retractStream.assignTimestampsAndWatermarks(new
>             BoRetractStreamTimestampAssigner()); (This is a deprecated
>             method)
>             // instead of: retractStream =
>             retractStream.assignTimestampsAndWatermarks(new
>             BoRetractStreamTimestampAssigner());
>             retractStream
>                  .keyBy(<key selector>)
>                  .process(new TableOutputProcessFunction())
>                  .name("ProcessTableOutput")
>                  .uid("ProcessTableOutput")
>                  .addSink(businessObjectSink)
>                  .name("businessObjectSink")
>                  .uid("businessObjectSink")
>                  .setParallelism(1);
>             ```
>
>             For your second question about setting the EventTime I'm
>             going to pull in Timo from the SDK team as I don't see an
>             issue with your code right away.
>
>             Best,
>             Matthias
>
>             [1]
>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
>             
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>
>
>             On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li
>             <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>
> <fuyaoli2...@gmail.com>> wrote:
>
>                 Hi Flink Users and Community,
>
>                 For the first part of the question, the 12 hour time
>                 difference is caused by a time extraction bug myself. I
>                 can get the time translated correctly now. The type cast
>                 problem does have some workarounds to solve it..
>
>                 My major blocker right now is the onTimer part is not
>                 properly triggered. I guess it is caused by failing to
>                 configure the correct watermarks & timestamp assigners.
>                 Please give me some insights.
>
>                 1. If I don't chain the assignTimestampsAndWatermarks()
>                 method in together with keyedBy().. and process()..
>                 method. The context.timestamp() in my processElement()
>                 function will be null. Is this some expected behavior?
>                 The Flink examples didn't chain it together. (see
>                 example here:
>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
>                 
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>)
>
>                 2. If I use registerEventTimeTimer() in
>                 processElement(). The onTimer method will not be
>                 triggered. However, I can trigger the onTimer method if
>                 I simply change it to registerProcessingTimeTimer(). I
>                 am using the settings below in the stream env.
>
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                 env.getConfig().setAutoWatermarkInterval(1000L);
>
>                 My code for method the process chain:
>                 retractStream
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>                 Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>
> .withTimestampAssigner((booleanRowTuple2, timestamp) -> {
>                                              Row rowData =
>                 booleanRowTuple2.f1;
>                                              LocalDateTime headerTime =
>                 (LocalDateTime)rowData.getField(3);
>                                              LocalDateTime linesTime =
>                 (LocalDateTime)rowData.getField(7);
>
>                                              LocalDateTime
>                 latestDBUpdateTime = null;
>                                              if (headerTime != null &&
>                 linesTime != null) {
>                                                  latestDBUpdateTime =
>                 headerTime.isAfter(linesTime) ? headerTime : linesTime;
>                                              }
>                                              else {
>                                                  latestDBUpdateTime =
>                 (headerTime != null) ? headerTime : linesTime;
>                                              }
>                                              if (latestDBUpdateTime !=
>                 null) {
>                                                  return
>
> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
>                                              }
>                                              // In the worst case, we
>                 use system time instead, which should never be reached.
>                                              return
>                 System.currentTimeMillis();
>                                          }))
>                 //              .assignTimestampsAndWatermarks(new
>                 MyWaterStrategy())  // second way to create watermark,
>                 doesn't work
>                                  .keyBy(value -> {
>                                      // There could be null fields for
>                 header invoice_id field
>                                      String invoice_id_key =
>                 (String)value.f1.getField(0);
>                                      if (invoice_id_key == null) {
>                                          invoice_id_key =
>                 (String)value.f1.getField(4);
>                                      }
>                                      return invoice_id_key;
>                                  })
>                                  .process(new
> TableOutputProcessFunction())
>                                  .name("ProcessTableOutput")
>                                  .uid("ProcessTableOutput")
>                                  .addSink(businessObjectSink)
>                                  .name("businessObjectSink")
>                                  .uid("businessObjectSink")
>                                  .setParallelism(1);
>
>                 Best regards,
>                 Fuyao
>
>                 On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li
>                 <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>
> <fuyaoli2...@gmail.com>>
>                 wrote:
>
>                     Hi Flink Community,
>
>                     I am doing some research work on Flink Datastream
>                     and Table API and I meet two major problems. I am
>                     using Flink 1.11.2, scala version 2.11, java 8. My
>                     use case looks like this. I plan to write a data
>                     processing pipeline with two stages. My goal is to
>                     construct a business object containing information
>                     from several Kafka streams with a primary key and
>                     emit the complete business object if such primary
>                     key doesn't  appear in the pipeline for 10 seconds.
>
>                     In the first stage, I first consume three Kafka
>                     streams and transform it to Flink Datastream using a
>                     deserialization schema containing some type and date
>                     format transformation, and then I register these
>                     data streams as Table and do a full outer join one
>                     by one using Table API. I also add query
>                     configuration for this to avoid excessive state. The
>                     primary key is also the join key.
>
>                     In the second stage, I transform the joined table to
>                     a retracted stream and put it into
>                     KeyedProcessFunction to generate the business object
>                     if the business object's primary key is inactive for
>                     10 second.
>
>                     Is this way of handling the data the suggested
>                     approach? (I understand I can directly consume kafka
>                     data in Table API. I haven't tried that yet, maybe
>                     that's better?) Any suggestion is welcomed. During
>                     implementing this, I meet two major problems and
>                     several smaller questions under each problem.
>
>
>                     1. Some type cast behavior of retracted streams I
>                     can't explain.
>
>                     (1) In the initial stage, I registered some field as
>                     *java.sql.Date* or *java.sql.timestamp* following
>                     the examples at
>                     (
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$
>                     
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$>)
>
>                     . After join and transform to retracted stream, it
>                     becomes *java.time.LocalDate* and
>                     *java.time.LocalDateTime* instead.
>
>                     For example, when first ingesting the Kafka streams,
>                     I registerd a attribute in java.sql.Timestamp type.
>
>                       @JsonAlias("ATTRIBUTE1")
>                       private @DataTypeHint(value = "TIMESTAMP(6)",
>                     bridgedTo = java.sql.Timestamp.class) Timestamp
>                     ATTRIBUTE1;
>
>                     When I tried to cast the type information back after
>                     the retracted stream, the code gives me error
>                     information below.
>
>                       java.lang.ClassCastException:
>                     java.time.LocalDateTime cannot be cast to
>                     java.sql.Timestamp
>
>                     Maybe I should use toAppendStream instead since
>                     append stream could register type information, but
>                     toRetractedStream can't do that?
>                     (
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$
>                     
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$>)
>
>
>                     My work around is to cast it to LocalDateTime first
>                     and extract the epoch time, this doesn't seem to be
>                     a final solution.
>
>                     (2) During timestamp conversion, the Flink to
>                     retracted stream seems to lost the AM/PM information
>                     in the stream and causing a 12 hour difference if it
>                     is PM.
>
>                     I use joda time to do some timestamp conversion in
>                     the first deserialization stage, my pattern looks
>                     like this. "a" means AM/PM information
>
>                       DateTimeFormatter format3 =
>                     DateTimeFormat.forPattern("dd-MMM-yy HH.mm.ss.SSSSSS
>                     a").withZone(DateTimeZone.getDefault());
>
>                     After the retracted stream, the AM/PM information is
>                     not preserved.
>
>
>                     2. My onTimer method in KeyedProcessFunction can not
>                     be triggered when I scheduled a event timer timer.
>
>                     I am using event time in my code. I am new to
>                     configure watermarks and I might miss something to
>                     configure it correctly. I also tried to register a
>                     processing time, it could enter and produce some
>                     results.
>
>                     I am trying to follow the example here:
>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$
>                     
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$>
>
>                     My onTimer method looks like this and the scheduled
>                     event doesn't happen..
>
>                     In processElement():
>
>
> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>                     + 10000);
>
>                     My onTimer function
>
>                        @Override
>                          public void onTimer(long timestamp,
>                     OnTimerContext ctx, Collector<BusinessObject>
>                     collector) throws Exception {
>                              TestBusinessObjectState result =
>                     testBusinessObjectState.value();
>                     log.info 
> <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$
> >
> <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>("Inside
> onTimer Method,
>                     current key: {}, timestamp: {}, last modified time:
>                     {}", ctx.getCurrentKey(), timestamp,
>                     result.getLastModifiedTime());
>
>                              // check if this is an outdated timer or
>                     the latest timer
>                              if (timestamp >=
>                     result.getLastModifiedTime() + 10000) {
>                                  // emit the state on timeout
>                     log.info 
> <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$
> >
> <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>("Collecting
> a business
>                     object, {}", result.getBusinessObject().toString());
>
> collector.collect(result.getBusinessObject());
>
>                                  cleanUp(ctx);
>                              }
>                          }
>
>                          private void cleanUp(Context ctx) throws
>                     Exception {
>                              Long timer =
>                     testBusinessObjectState.value().getLastModifiedTime();
>
> ctx.timerService().deleteEventTimeTimer(timer);
>                              testBusinessObjectState.clear();
>                          }
>
>
>                     (1) When I assign the timestamp and watermarks
>                     outside the process() method chain. The
>                     "context.timestamp()" will be null. If I put it
>                     inside the chain, it won't be null. Is this the
>                     expected behavior? In the null case, the strange
>                     thing is that, surprisingly, I can collect the
>                     business object immediately without a designed 10
>                     second waiting time... This shouldn't happen,
>                     right...? The processing timer also seems to work.
>                     The code can enter the on timer method.
>
>                       retractStream.assignTimestampsAndWatermarks(new
>                     BoRetractStreamTimestampAssigner()); (This is a
>                     deprecated method)
>
>                       retractStream
>                          .keyBy(<key selector>)
>                          .process(new TableOutputProcessFunction())
>                          .name("ProcessTableOutput")
>                          .uid("ProcessTableOutput")
>                          .addSink(businessObjectSink)
>                          .name("businessObjectSink")
>                          .uid("businessObjectSink")
>                          .setParallelism(1);
>
>                     (2) For watermarks configuration. I use an field in
>                     the retracted stream as the event time. This time is
>                     usually 15-20 seconds before current time.
>
>                     In my environment, I have done some settings for
>                     streaming env based on information here(
>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
>                     
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>).
>
>                     My event doesn't always come, so I think I need to
>                     set auto watermark interval to let the event timer
>                     on timer works correctly. I have added the code below.
>
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                     env.getConfig().setAutoWatermarkInterval(1000L);
>
>                     1> Which kind of watermark strategy should I use?
>                     General BoundOutofOrderness or Watermark generator?
>
>                     I tried to write a Watermark generator and I just
>                     don't how to apply it to the stream correctly. The
>                     documentation doesn't explain very clearly. My code
>                     looks like below and it doesn't work.
>
>                     assign part:
>
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
>                     Row>>) context -> new
>                     TableBoundOutofOrdernessGenerator()))
>
>                     watermark generater:
>
>                     I just assign the event time attribute following the
>                     example in the doc.
>                     (
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
>                     
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>)
>
>
>                     2> I also tried to use the static method in Water
>                     Strategy. The syntax is correct, but I meet the same
>                     problem in 2.(1).
>
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
>                     Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
>
> .withTimestampAssigner((booleanRowTuple2, timestamp)
>                     -> {
>                                                  <Select a event time
>                     attribute in the booleanRowTuple2>
>                                              }))
>
>
>                     (3) For the retracted datastream, do I need to
>                     explicitly attach it to the stream environment? I
>                     think it is done by default, right? Just want to
>                     confirm it. I do have the env.execute() at the end
>                     of the code.
>
>                     I understand this is a lot of questions, thanks a
>                     lot for your patience to look through my email! If
>                     there is anything unclear, please reach out to me.
>                     Thanks!
>
>
>                     Best regards,
>
>                     Fuyao Li
>
>
>

Reply via email to