Hi Fuyao,

yes I agree. The code evolved quicker than the docs. I will create an issue for this.

Regards,
Timo



On 25.11.20 19:27, Fuyao Li wrote:
Hi Timo,
Thanks for your information. I saw the Flink SQL can actually do the full outer join in the test code with interval join semantic. However, this is not explicitly shown in the Flink SQL documentation. That makes me thinking this might not be available for me to use. Maybe the doc could be updated to explicitly show what kind of join it can do with interval join. (https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins>)

Thanks!
Fuyao



On Tue, Nov 24, 2020 at 9:06 AM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Fuyao,

    great that you could make progress.

    2. Btw nice summary of the idleness concept. We should have that in the
    docs actually.

    4. By looking at tests like `IntervalJoinITCase` [1] it seems that we
    also support FULL OUTER JOINs as interval joins. Maybe you can make use
    of them.

    5. "buffered design is only suitable for *offline* data processing,
    right"
    I guess this depends on the size of data and the SLAs for checkpointing
    time.

    In general, when using the RocksDB state backend you can add a lot of
    data into state. One backfill option could be to warmup a pipeline and
    only perform a savepoint at the end. Then one could start the online
    pipeline (maybe with a different flag set) from this savepoint again.

    Or you even configure the state handling of your backfill operator
    via a
    control stream.

    We are also working on a better offline data processing story for
    DataStream API programs in Flink 1.12. This might be interesting for
    you
    as well [2].

    "how can they process the last piece of data"

    Maybe they are emitting a max watermark at some point. Bounded sources
    will also do that. The watermarks are not checkpointed and start from
    scratch when restarting the Flink job.

    Regards,
    Timo

    [1]
    
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
    
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala>
    [2]
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
    
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API>




    On 20.11.20 20:37, fuyao...@oracle.com <mailto:fuyao...@oracle.com>
    wrote:
     > Hi Timo,
     >
     > Thanks for your reply! I think your suggestions is really
    helpful! The
     > good news is that I had managed to figure out it something by
    myself few
     > days ago.
     >
     > 1. Thanks for the update about the table parallelism issue!
     >
     > 2. After trying out the idleness setting. It prevents some idle
    subtasks
     > from blocking the pipeline's overall watermark and it works for me.
     > Based on my observation and reading the source code, I have
    summarized
     > some notes. Please correct me if I am wrong.
     >
     >  1. (1)Watermark is independent within each subtask for an Flink
    operator.
     >  2. (2)The watermark of the multi-parallelism table operator is
    always
     >     dominated by least watermark of the current*ACTIVE*subtasks.
     >  3. (3)With withIdleness() configured. A subtask will be mark as
    idle if
     >     it hasn’t receive message for configured period of time. It
    will NOT
     >     execute onPeriodEmit() and emit watermark after reaching the idle
     >     state. Between [the start of the application/receive a new
    message]
     >     and [reaching into the idle state], the onPeriodEmit() will still
     >     emit watermark and dominate the overall context watermark if it
     >     holds the smallest watermark among the subtasks.
     >  4. (4)Once an idle subtask receive a new message, it will switch its
     >     status from idle to active and start to influence the overall
     >     context watermark.
     >
     > 3. In order to route the correct information to the subtask in
    the join
     > step, I have added the keyed() logic in the source based on the
    join key
     > in the join step. It seems to work correctly and could route the
    message
     > to a current place.
     >
     > 4. For the interval join, I think I can't use it directly since I
    need
     > to use full outer join to not lose any information from any upstream
     > datastream. I think interval join is a inner join it can't do
    this task.
     > I guess my only option is to do full outer join with query
    configuration.
     >
     > 5. One more question about the data replay issue. I read the
    ververica
     > blog
     >
    
(https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers
    
<https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers>)

     > and I think with replay use case, we will face some similar
    issues. I
     > think the suggested approach mentioned
     >
     >    (1). Puts each incoming track record in a map keyed by its
    timestamp
     >
     >    (2). creates an event timer to process that record once the
    watermark
     > hits that point.
     >
     > I kind of understand the idea here. Buffer all the data(maybe delete
     > some of the old track if processed) in a track ordered by
    timestamp and
     > trigger the event timer sequentially with this buffered track.
     >
     > Based on my understanding, this buffered design is only suitable for
     > *offline* data processing, right? (It is a waste of resource to
    buffer
     > this in real time. )
     >
     > Also, from the article, I think they are using periodic watermark
     > strategy[1]. how can they process the last piece of data records
    with
     > periodic watermark strategy since there is no more incoming data to
     > advance the watermark? So the last piece of data will never be
    processed
     > here? Is there a way to gracefully handle this? My use case doesn't
     > allow me to lose any information.
     >
     >
     >
    
[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
    
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator>
     >
     > Best,
     >
     > Fuyao
     >
     >
     > On 11/20/20 08:55, Timo Walther wrote:
     >> Hi Fuyao,
     >>
     >> sorry for not replying earlier.
     >>
     >> You posted a lot of questions. I scanned the thread quickly, let me
     >> try to answer some of them and feel free to ask further questions
     >> afterwards.
     >>
     >> "is it possible to configure the parallelism for Table operation at
     >> operator level"
     >>
     >> No this is not possible at the moment. The reason is 1) we don't
    know
     >> how to expose such a functionality in a nice way. Maybe we will use
     >> SQL hints in the future [1]. 2) Sometime the planner sets the
     >> paralellism of operators explicitly to 1. All other operators
    will use
     >> the globally defined parallelism for the pipeline (also to not
    mess up
     >> retraction messages internally). You will be able to set the
     >> parallelism of the sink operation in Flink 1.12.
     >>
     >> "BoundedOutOfOrderness Watermark Generator is NOT making the event
     >> time to advance"
     >>
     >> Have you checked if you can use an interval join instead of a full
     >> join with state retention? Table/SQL pipelines that don't
    preserve a
     >> time attribute in the end might also erase the underlying
    watermarks.
     >> Thus, event time triggers will not work after your join.
     >>
     >> "Why can't I update the watermarks for all 8 parallelisms?"
     >>
     >> You could play around with idleness for your source [2]. Or you set
     >> the source parallelism to 1 (while keeping the rest of the pipeline
     >> globally set to 8), would that be an option?
     >>
     >> "Some type cast behavior of retracted streams I can't explain."
     >>
     >> toAppendStream/toRetractStream still need an update to the new type
     >> system. This is explained in FLIP-136 which will be part of
    Flink 1.13
     >> [3].
     >>
     >> I hope I could help a bit.
     >>
     >> Regards,
     >> Timo
     >>
     >>
     >> [1]
     >>
    
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$
    
<https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$>

     >> [2]
     >>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$>

     >> [3]
     >>
    
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$
    
<https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$>

     >>
     >> On 13.11.20 21:39, Fuyao Li wrote:
     >>> Hi Matthias,
     >>>
     >>> Just to provide more context on this problem. I only have 1
    partition
     >>> per each Kafka Topic at the beginning before the join operation.
     >>> After reading the doc:
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$>

     >>> >
     >>>
     >>> Maybe that is the root cause of my problem here, with less than 8
     >>> partitions (only 1 partition in my case), using the default
     >>> parallelism of 8 will cause this wrong behavior. This is my
    guess, it
     >>> takes a while to test it out... What's your opinion on this?
    Thanks!
     >>>
     >>> Best,
     >>>
     >>> Fuyao
     >>>
     >>>
     >>> On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li
    <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>
     >>> <mailto:fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>>>
    wrote:
     >>>
     >>>     Hi Matthias,
     >>>
     >>>     One more question regarding Flink table parallelism, is it
    possible
     >>>     to configure the parallelism for Table operation at
    operator level,
     >>>     it seems we don't have such API available, right? Thanks!
     >>>
     >>>     Best,
     >>>     Fuyao
     >>>
     >>>     On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li
    <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>
     >>> <mailto:fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com>>>
    wrote:
     >>>
     >>>         Hi Matthias,
     >>>
     >>>         Thanks for your information. I have managed to figure
    out the
     >>>         first issue you mentioned. Regarding the second issue.
    I have
     >>>         got some progress on it.
     >>>
     >>>         I have sent another email with the title
    'BoundedOutOfOrderness
     >>>         Watermark Generator is NOT making the event time to
    advance'
     >>>         using another email of mine, fuyao...@oracle.com
    <mailto:fuyao...@oracle.com>
     >>> <mailto:fuyao...@oracle.com <mailto:fuyao...@oracle.com>>. That
    email contains some more
     >>>         context on my issue. Please take a look. I have made some
     >>>         progress after sending that new email.
     >>>
     >>>         Previously, I had managed to make timelag watermark
    strategy
     >>>         working in my code, but my bound out of orderness
    strategy or
     >>>         punctuated watermark strategy doesn't work well. It
    produces 8
     >>>         watermarks each time. Two cycles are shown below.
     >>>
     >>>         I managed to figure out the root cause is that Flink stream
     >>>         execution environment has a default parallelism as 8.*I
    didn't
     >>>         notice in the doc, could the Community add this
    explicitly into
     >>>         the official doc to avoid some confusion? Thanks.*
     >>>
     >>>          From my understanding, the watermark advances based on the
     >>>         lowest watermark among the 8, so I can not advance the
    bound out
     >>>         of orderness watermark since I am only advancing 1 of the 8
     >>>         parallelisms. If I set the entire stream execution
    environment
     >>>         to be of parallelism 1, it will reflect the watermark
    in the
     >>>         context correctly. One more thing is that this behavior
    is not
     >>>         reflected in the Flink Cluster web UI interface. I can
    see the
     >>>         watermark is advancing, but it is not in reality. *That's
     >>>         causing the inconsistency problem I mentioned in the
    other email
     >>>         I mentioned above. Will this be considered as a bug in
    the UI?*
     >>>
     >>>         My current question is, since I have full outer join
    operation
     >>>         before the KeyedProcessFunction here. How can I let the
    bound of
     >>>         orderness watermark / punctuated watermark strategy
    work if the
     >>>         parallelism > 1? It can only update one of the 8
    parallelisms
     >>>         for the watermark for this onTimer operator. Is this
    related to
     >>>         my Table full outer join operation before this step?
    According
     >>>         to the doc,
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$>

     >>> >
     >>>
     >>>         Default parallelism should be the same like the stream
     >>>         environment. Why can't I update the watermarks for all 8
     >>>         parallelisms? What should I do to enable this function with
     >>>         Parallelism larger than 1? Thanks.
     >>>
     >>>         First round: (Note the first column of each log row is the
     >>>         timelag strategy, it is getting updated correctly for all 8
     >>>         parallelism, but the other two strategies I mentioned above
     >>>         can't do that..)
     >>>
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266199,
     >>>         periodicEmitWatermarkTime: 1605047172881,
    currentMaxTimestamp:
     >>>         1605047187881 (only one of the 8 parallelism for bound
    out of
     >>>         orderness is getting my new watermark)
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266199,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:01,199 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047266198,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>
     >>>         Second round: (I set the autoWatermark interval to be 5
    seconds)
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 1605047172881,
    currentMaxTimestamp:
     >>>         1605047187881
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>         14:28:06,200 INFO
     >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
     >>>         - Emit Watermark: watermark based on system time:
    1605047271200,
     >>>         periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
     >>>
     >>>
     >>>         Best regards,
     >>>
     >>>         Fuyao
     >>>
     >>>
     >>>         On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl
     >>>         <matth...@ververica.com <mailto:matth...@ververica.com>
    <mailto:matth...@ververica.com <mailto:matth...@ververica.com>>> wrote:
     >>>
     >>>             Hi Fuyao,
     >>>             for your first question about the different behavior
     >>>             depending on whether you chain the methods or not:
    Keep in
     >>>             mind that you have to save the return value of the
     >>>             assignTimestampsAndWatermarks method call if you
    don't chain
     >>>             the methods together as it is also shown in [1].
     >>>             At least the following example from your first
    message is
     >>>             indicating it:
     >>>             ```
     >>>             retractStream.assignTimestampsAndWatermarks(new
     >>>             BoRetractStreamTimestampAssigner()); (This is a
    deprecated
     >>>             method)
     >>>             // instead of: retractStream =
     >>>             retractStream.assignTimestampsAndWatermarks(new
     >>>             BoRetractStreamTimestampAssigner());
     >>>             retractStream
     >>>                  .keyBy(<key selector>)
     >>>                  .process(new TableOutputProcessFunction())
     >>>                  .name("ProcessTableOutput")
     >>>                  .uid("ProcessTableOutput")
     >>>                  .addSink(businessObjectSink)
     >>>                  .name("businessObjectSink")
     >>>                  .uid("businessObjectSink")
     >>>                  .setParallelism(1);
     >>>             ```
     >>>
     >>>             For your second question about setting the
    EventTime I'm
     >>>             going to pull in Timo from the SDK team as I don't
    see an
     >>>             issue with your code right away.
     >>>
     >>>             Best,
     >>>             Matthias
     >>>
     >>>             [1]
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>

     >>> >
     >>>
     >>>             On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li
     >>>             <fuyaoli2...@gmail.com
    <mailto:fuyaoli2...@gmail.com> <mailto:fuyaoli2...@gmail.com
    <mailto:fuyaoli2...@gmail.com>>>
     >>> wrote:
     >>>
     >>>                 Hi Flink Users and Community,
     >>>
     >>>                 For the first part of the question, the 12 hour
    time
     >>>                 difference is caused by a time extraction bug
    myself. I
     >>>                 can get the time translated correctly now. The
    type cast
     >>>                 problem does have some workarounds to solve it..
     >>>
     >>>                 My major blocker right now is the onTimer part
    is not
     >>>                 properly triggered. I guess it is caused by
    failing to
     >>>                 configure the correct watermarks & timestamp
    assigners.
     >>>                 Please give me some insights.
     >>>
     >>>                 1. If I don't chain the
    assignTimestampsAndWatermarks()
     >>>                 method in together with keyedBy().. and process()..
     >>>                 method. The context.timestamp() in my
    processElement()
     >>>                 function will be null. Is this some expected
    behavior?
     >>>                 The Flink examples didn't chain it together. (see
     >>>                 example here:
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>

     >>> >)
     >>>                 2. If I use registerEventTimeTimer() in
     >>>                 processElement(). The onTimer method will not be
     >>>                 triggered. However, I can trigger the onTimer
    method if
     >>>                 I simply change it to
    registerProcessingTimeTimer(). I
     >>>                 am using the settings below in the stream env.
     >>>
     >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     >>>                 env.getConfig().setAutoWatermarkInterval(1000L);
     >>>
     >>>                 My code for method the process chain:
     >>>                 retractStream
     >>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
     >>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
     >>> .withTimestampAssigner((booleanRowTuple2, timestamp) -> {
     >>>                                              Row rowData =
     >>>                 booleanRowTuple2.f1;
     >>>                                              LocalDateTime
    headerTime =
     >>>                 (LocalDateTime)rowData.getField(3);
     >>>                                              LocalDateTime
    linesTime =
     >>>                 (LocalDateTime)rowData.getField(7);
     >>>
     >>>                                              LocalDateTime
     >>>                 latestDBUpdateTime = null;
     >>>                                              if (headerTime !=
    null &&
     >>>                 linesTime != null) {
     >>> latestDBUpdateTime =
     >>>                 headerTime.isAfter(linesTime) ? headerTime :
    linesTime;
     >>>                                              }
     >>>                                              else {
     >>> latestDBUpdateTime =
     >>>                 (headerTime != null) ? headerTime : linesTime;
     >>>                                              }
     >>>                                              if
    (latestDBUpdateTime !=
     >>>                 null) {
     >>>                                                  return
     >>>
    
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
     >>>                                              }
     >>>                                              // In the worst
    case, we
     >>>                 use system time instead, which should never be
    reached.
     >>>                                              return
     >>>                 System.currentTimeMillis();
     >>>                                          }))
     >>>                 //  .assignTimestampsAndWatermarks(new
     >>>                 MyWaterStrategy())  // second way to create
    watermark,
     >>>                 doesn't work
     >>>                                  .keyBy(value -> {
     >>>                                      // There could be null
    fields for
     >>>                 header invoice_id field
     >>>                                      String invoice_id_key =
     >>>                 (String)value.f1.getField(0);
     >>>                                      if (invoice_id_key == null) {
     >>>                                          invoice_id_key =
     >>>                 (String)value.f1.getField(4);
     >>>                                      }
     >>>                                      return invoice_id_key;
     >>>                                  })
     >>>                                  .process(new
     >>> TableOutputProcessFunction())
     >>>                                  .name("ProcessTableOutput")
     >>>                                  .uid("ProcessTableOutput")
     >>>                                  .addSink(businessObjectSink)
     >>>                                  .name("businessObjectSink")
     >>>                                  .uid("businessObjectSink")
     >>>                                  .setParallelism(1);
     >>>
     >>>                 Best regards,
     >>>                 Fuyao
     >>>
     >>>                 On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li
     >>>                 <fuyaoli2...@gmail.com
    <mailto:fuyaoli2...@gmail.com> <mailto:fuyaoli2...@gmail.com
    <mailto:fuyaoli2...@gmail.com>>>
     >>>                 wrote:
     >>>
     >>>                     Hi Flink Community,
     >>>
     >>>                     I am doing some research work on Flink
    Datastream
     >>>                     and Table API and I meet two major
    problems. I am
     >>>                     using Flink 1.11.2, scala version 2.11,
    java 8. My
     >>>                     use case looks like this. I plan to write a
    data
     >>>                     processing pipeline with two stages. My
    goal is to
     >>>                     construct a business object containing
    information
     >>>                     from several Kafka streams with a primary
    key and
     >>>                     emit the complete business object if such
    primary
     >>>                     key doesn't  appear in the pipeline for 10
    seconds.
     >>>
     >>>                     In the first stage, I first consume three Kafka
     >>>                     streams and transform it to Flink
    Datastream using a
     >>>                     deserialization schema containing some type
    and date
     >>>                     format transformation, and then I register
    these
     >>>                     data streams as Table and do a full outer
    join one
     >>>                     by one using Table API. I also add query
     >>>                     configuration for this to avoid excessive
    state. The
     >>>                     primary key is also the join key.
     >>>
     >>>                     In the second stage, I transform the joined
    table to
     >>>                     a retracted stream and put it into
     >>>                     KeyedProcessFunction to generate the
    business object
     >>>                     if the business object's primary key is
    inactive for
     >>>                     10 second.
     >>>
     >>>                     Is this way of handling the data the suggested
     >>>                     approach? (I understand I can directly
    consume kafka
     >>>                     data in Table API. I haven't tried that
    yet, maybe
     >>>                     that's better?) Any suggestion is welcomed.
    During
     >>>                     implementing this, I meet two major
    problems and
     >>>                     several smaller questions under each problem.
     >>>
     >>>
     >>>                     1. Some type cast behavior of retracted
    streams I
     >>>                     can't explain.
     >>>
     >>>                     (1) In the initial stage, I registered some
    field as
     >>>                     *java.sql.Date* or
    *java.sql.timestamp* following
     >>>                     the examples at
     >>>
    
(https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$>

     >>> >)
     >>>                     . After join and transform to retracted
    stream, it
     >>>                     becomes *java.time.LocalDate* and
     >>>                     *java.time.LocalDateTime* instead.
     >>>
     >>>                     For example, when first ingesting the Kafka
    streams,
     >>>                     I registerd a attribute in
    java.sql.Timestamp type.
     >>>
     >>>                       @JsonAlias("ATTRIBUTE1")
     >>>                       private @DataTypeHint(value = "TIMESTAMP(6)",
     >>>                     bridgedTo = java.sql.Timestamp.class) Timestamp
     >>>                     ATTRIBUTE1;
     >>>
     >>>                     When I tried to cast the type information
    back after
     >>>                     the retracted stream, the code gives me error
     >>>                     information below.
     >>>
     >>>                       java.lang.ClassCastException:
     >>>                     java.time.LocalDateTime cannot be cast to
     >>>                     java.sql.Timestamp
     >>>
     >>>                     Maybe I should use toAppendStream instead since
     >>>                     append stream could register type
    information, but
     >>>                     toRetractedStream can't do that?
     >>>
    
(https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$>

     >>> >)
     >>>
     >>>                     My work around is to cast it to
    LocalDateTime first
     >>>                     and extract the epoch time, this doesn't
    seem to be
     >>>                     a final solution.
     >>>
     >>>                     (2) During timestamp conversion, the Flink to
     >>>                     retracted stream seems to lost the AM/PM
    information
     >>>                     in the stream and causing a 12 hour
    difference if it
     >>>                     is PM.
     >>>
     >>>                     I use joda time to do some timestamp
    conversion in
     >>>                     the first deserialization stage, my pattern
    looks
     >>>                     like this. "a" means AM/PM information
     >>>
     >>>                       DateTimeFormatter format3 =
     >>>                     DateTimeFormat.forPattern("dd-MMM-yy
    HH.mm.ss.SSSSSS
     >>>                     a").withZone(DateTimeZone.getDefault());
     >>>
     >>>                     After the retracted stream, the AM/PM
    information is
     >>>                     not preserved.
     >>>
     >>>
     >>>                     2. My onTimer method in
    KeyedProcessFunction can not
     >>>                     be triggered when I scheduled a event timer
    timer.
     >>>
     >>>                     I am using event time in my code. I am new to
     >>>                     configure watermarks and I might miss
    something to
     >>>                     configure it correctly. I also tried to
    register a
     >>>                     processing time, it could enter and produce
    some
     >>>                     results.
     >>>
     >>>                     I am trying to follow the example here:
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$>

     >>> >
     >>>
     >>>                     My onTimer method looks like this and the
    scheduled
     >>>                     event doesn't happen..
     >>>
     >>>                     In processElement():
     >>>
     >>>
    context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
     >>>                     + 10000);
     >>>
     >>>                     My onTimer function
     >>>
     >>>                        @Override
     >>>                          public void onTimer(long timestamp,
     >>>                     OnTimerContext ctx, Collector<BusinessObject>
     >>>                     collector) throws Exception {
     >>>                              TestBusinessObjectState result =
     >>>                     testBusinessObjectState.value();
     >>> log.info <http://log.info>
     >>>
    
<https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$
    
<https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>

     >>> >("Inside onTimer Method,
     >>>                     current key: {}, timestamp: {}, last
    modified time:
     >>>                     {}", ctx.getCurrentKey(), timestamp,
     >>>                     result.getLastModifiedTime());
     >>>
     >>>                              // check if this is an outdated
    timer or
     >>>                     the latest timer
     >>>                              if (timestamp >=
     >>>                     result.getLastModifiedTime() + 10000) {
     >>>                                  // emit the state on timeout
     >>> log.info <http://log.info>
     >>>
    
<https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$
    
<https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>

     >>> >("Collecting a business
     >>>                     object, {}",
    result.getBusinessObject().toString());
     >>> collector.collect(result.getBusinessObject());
     >>>
     >>>                                  cleanUp(ctx);
     >>>                              }
     >>>                          }
     >>>
     >>>                          private void cleanUp(Context ctx) throws
     >>>                     Exception {
     >>>                              Long timer =
     >>> testBusinessObjectState.value().getLastModifiedTime();
     >>> ctx.timerService().deleteEventTimeTimer(timer);
     >>>                              testBusinessObjectState.clear();
     >>>                          }
     >>>
     >>>
     >>>                     (1) When I assign the timestamp and watermarks
     >>>                     outside the process() method chain. The
     >>>                     "context.timestamp()" will be null. If I put it
     >>>                     inside the chain, it won't be null. Is this the
     >>>                     expected behavior? In the null case, the
    strange
     >>>                     thing is that, surprisingly, I can collect the
     >>>                     business object immediately without a
    designed 10
     >>>                     second waiting time... This shouldn't happen,
     >>>                     right...? The processing timer also seems
    to work.
     >>>                     The code can enter the on timer method.
     >>>
     >>>  retractStream.assignTimestampsAndWatermarks(new
     >>>                     BoRetractStreamTimestampAssigner()); (This is a
     >>>                     deprecated method)
     >>>
     >>>                       retractStream
     >>>                          .keyBy(<key selector>)
     >>>                          .process(new TableOutputProcessFunction())
     >>>                          .name("ProcessTableOutput")
     >>>                          .uid("ProcessTableOutput")
     >>>                          .addSink(businessObjectSink)
     >>>                          .name("businessObjectSink")
     >>>                          .uid("businessObjectSink")
     >>>                          .setParallelism(1);
     >>>
     >>>                     (2) For watermarks configuration. I use an
    field in
     >>>                     the retracted stream as the event time.
    This time is
     >>>                     usually 15-20 seconds before current time.
     >>>
     >>>                     In my environment, I have done some
    settings for
     >>>                     streaming env based on information here(
     >>>
    
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>

     >>> >).
     >>>                     My event doesn't always come, so I think I
    need to
     >>>                     set auto watermark interval to let the
    event timer
     >>>                     on timer works correctly. I have added the
    code
     >>> below.
     >>>
     >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     >>> env.getConfig().setAutoWatermarkInterval(1000L);
     >>>
     >>>                     1> Which kind of watermark strategy should
    I use?
     >>>                     General BoundOutofOrderness or Watermark
    generator?
     >>>
     >>>                     I tried to write a Watermark generator and
    I just
     >>>                     don't how to apply it to the stream
    correctly. The
     >>>                     documentation doesn't explain very clearly.
    My code
     >>>                     looks like below and it doesn't work.
     >>>
     >>>                     assign part:
     >>>
     >>>
    
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean,
     >>>                     Row>>) context -> new
     >>>                     TableBoundOutofOrdernessGenerator()))
     >>>
     >>>                     watermark generater:
     >>>
     >>>                     I just assign the event time attribute
    following the
     >>>                     example in the doc.
     >>>
    
(https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>

     >>>
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$
    
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>

     >>> >)
     >>>
     >>>                     2> I also tried to use the static method in
    Water
     >>>                     Strategy. The syntax is correct, but I meet
    the same
     >>>                     problem in 2.(1).
     >>>
     >>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean,
     >>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15))
     >>> .withTimestampAssigner((booleanRowTuple2, timestamp)
     >>>                     -> {
     >>>                                                  <Select a
    event time
     >>>                     attribute in the booleanRowTuple2>
     >>>                                              }))
     >>>
     >>>
     >>>                     (3) For the retracted datastream, do I need to
     >>>                     explicitly attach it to the stream
    environment? I
     >>>                     think it is done by default, right? Just
    want to
     >>>                     confirm it. I do have the env.execute() at
    the end
     >>>                     of the code.
     >>>
     >>>                     I understand this is a lot of questions,
    thanks a
     >>>                     lot for your patience to look through my
    email! If
     >>>                     there is anything unclear, please reach out
    to me.
     >>>                     Thanks!
     >>>
     >>>
     >>>                     Best regards,
     >>>
     >>>                     Fuyao Li
     >>>
     >>


Reply via email to