AFAIK, FLINK-10886 is not implemented yet. cc @Becket may know more plans about this feature.
Best, Jark On Sat, 21 Nov 2020 at 03:46, <fuyao...@oracle.com> wrote: > Hi Timo, > > One more question, the blog also mentioned a jira task to solve this > issue. https://issues.apache.org/jira/browse/FLINK-10886. Will this > feature be available in 1.12? Thanks! > > Best, > > Fuyao > On 11/20/20 11:37, fuyao...@oracle.com wrote: > > Hi Timo, > > Thanks for your reply! I think your suggestions is really helpful! The > good news is that I had managed to figure out it something by myself few > days ago. > > 1. Thanks for the update about the table parallelism issue! > > 2. After trying out the idleness setting. It prevents some idle subtasks > from blocking the pipeline's overall watermark and it works for me. Based > on my observation and reading the source code, I have summarized some > notes. Please correct me if I am wrong. > > 1. (1)Watermark is independent within each subtask for an Flink > operator. > 2. (2)The watermark of the multi-parallelism table operator is always > dominated by least watermark of the current *ACTIVE* subtasks. > 3. (3)With withIdleness() configured. A subtask will be mark as idle > if it hasn’t receive message for configured period of time. It will NOT > execute onPeriodEmit() and emit watermark after reaching the idle state. > Between [the start of the application/receive a new message] and [reaching > into the idle state], the onPeriodEmit() will still emit watermark and > dominate the overall context watermark if it holds the smallest watermark > among the subtasks. > 4. (4)Once an idle subtask receive a new message, it will switch its > status from idle to active and start to influence the overall context > watermark. > > 3. In order to route the correct information to the subtask in the join > step, I have added the keyed() logic in the source based on the join key in > the join step. It seems to work correctly and could route the message to a > current place. > > 4. For the interval join, I think I can't use it directly since I need to > use full outer join to not lose any information from any upstream > datastream. I think interval join is a inner join it can't do this task. I > guess my only option is to do full outer join with query configuration. > > 5. One more question about the data replay issue. I read the ververica > blog ( > https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers) > and I think with replay use case, we will face some similar issues. I think > the suggested approach mentioned > > (1). Puts each incoming track record in a map keyed by its timestamp > > (2). creates an event timer to process that record once the watermark > hits that point. > > I kind of understand the idea here. Buffer all the data(maybe delete some > of the old track if processed) in a track ordered by timestamp and trigger > the event timer sequentially with this buffered track. > > Based on my understanding, this buffered design is only suitable for > *offline* data processing, right? (It is a waste of resource to buffer > this in real time. ) > > Also, from the article, I think they are using periodic watermark > strategy[1]. how can they process the last piece of data records with > periodic watermark strategy since there is no more incoming data to advance > the watermark? So the last piece of data will never be processed here? Is > there a way to gracefully handle this? My use case doesn't allow me to lose > any information. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator > > Best, > > Fuyao > > > On 11/20/20 08:55, Timo Walther wrote: > > Hi Fuyao, > > sorry for not replying earlier. > > You posted a lot of questions. I scanned the thread quickly, let me try to > answer some of them and feel free to ask further questions afterwards. > > "is it possible to configure the parallelism for Table operation at > operator level" > > No this is not possible at the moment. The reason is 1) we don't know how > to expose such a functionality in a nice way. Maybe we will use SQL hints > in the future [1]. 2) Sometime the planner sets the paralellism of > operators explicitly to 1. All other operators will use the globally > defined parallelism for the pipeline (also to not mess up retraction > messages internally). You will be able to set the parallelism of the sink > operation in Flink 1.12. > > "BoundedOutOfOrderness Watermark Generator is NOT making the event time to > advance" > > Have you checked if you can use an interval join instead of a full join > with state retention? Table/SQL pipelines that don't preserve a time > attribute in the end might also erase the underlying watermarks. Thus, > event time triggers will not work after your join. > > "Why can't I update the watermarks for all 8 parallelisms?" > > You could play around with idleness for your source [2]. Or you set the > source parallelism to 1 (while keeping the rest of the pipeline globally > set to 8), would that be an option? > > "Some type cast behavior of retracted streams I can't explain." > > toAppendStream/toRetractStream still need an update to the new type > system. This is explained in FLIP-136 which will be part of Flink 1.13 [3]. > > I hope I could help a bit. > > Regards, > Timo > > > [1] > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$ > [2] > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$ > [3] > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$ > On 13.11.20 21:39, Fuyao Li wrote: > > Hi Matthias, > > Just to provide more context on this problem. I only have 1 partition per > each Kafka Topic at the beginning before the join operation. After reading > the doc: > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$> > > Maybe that is the root cause of my problem here, with less than 8 > partitions (only 1 partition in my case), using the default parallelism of > 8 will cause this wrong behavior. This is my guess, it takes a while to > test it out... What's your opinion on this? Thanks! > > Best, > > Fuyao > > > On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <fuyaoli2...@gmail.com > <mailto:fuyaoli2...@gmail.com> <fuyaoli2...@gmail.com>> wrote: > > Hi Matthias, > > One more question regarding Flink table parallelism, is it possible > to configure the parallelism for Table operation at operator level, > it seems we don't have such API available, right? Thanks! > > Best, > Fuyao > > On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li <fuyaoli2...@gmail.com > <mailto:fuyaoli2...@gmail.com> <fuyaoli2...@gmail.com>> wrote: > > Hi Matthias, > > Thanks for your information. I have managed to figure out the > first issue you mentioned. Regarding the second issue. I have > got some progress on it. > > I have sent another email with the title 'BoundedOutOfOrderness > Watermark Generator is NOT making the event time to advance' > using another email of mine, fuyao...@oracle.com > <mailto:fuyao...@oracle.com> <fuyao...@oracle.com>. That email > contains some more > context on my issue. Please take a look. I have made some > progress after sending that new email. > > Previously, I had managed to make timelag watermark strategy > working in my code, but my bound out of orderness strategy or > punctuated watermark strategy doesn't work well. It produces 8 > watermarks each time. Two cycles are shown below. > > I managed to figure out the root cause is that Flink stream > execution environment has a default parallelism as 8.*I didn't > notice in the doc, could the Community add this explicitly into > the official doc to avoid some confusion? Thanks.* > > From my understanding, the watermark advances based on the > lowest watermark among the 8, so I can not advance the bound out > of orderness watermark since I am only advancing 1 of the 8 > parallelisms. If I set the entire stream execution environment > to be of parallelism 1, it will reflect the watermark in the > context correctly. One more thing is that this behavior is not > reflected in the Flink Cluster web UI interface. I can see the > watermark is advancing, but it is not in reality. *That's > causing the inconsistency problem I mentioned in the other email > I mentioned above. Will this be considered as a bug in the UI?* > > My current question is, since I have full outer join operation > before the KeyedProcessFunction here. How can I let the bound of > orderness watermark / punctuated watermark strategy work if the > parallelism > 1? It can only update one of the 8 parallelisms > for the watermark for this onTimer operator. Is this related to > my Table full outer join operation before this step? According > to the doc, > > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$> > > Default parallelism should be the same like the stream > environment. Why can't I update the watermarks for all 8 > parallelisms? What should I do to enable this function with > Parallelism larger than 1? Thanks. > > First round: (Note the first column of each log row is the > timelag strategy, it is getting updated correctly for all 8 > parallelism, but the other two strategies I mentioned above > can't do that..) > > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266199, > periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: > 1605047187881 (only one of the 8 parallelism for bound out of > orderness is getting my new watermark) > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266199, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:01,199 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047266198, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > > Second round: (I set the autoWatermark interval to be 5 seconds) > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: > 1605047187881 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > 14:28:06,200 INFO > > org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator > - Emit Watermark: watermark based on system time: 1605047271200, > periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 > > > Best regards, > > Fuyao > > > On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl > <matth...@ververica.com <mailto:matth...@ververica.com> > <matth...@ververica.com>> wrote: > > Hi Fuyao, > for your first question about the different behavior > depending on whether you chain the methods or not: Keep in > mind that you have to save the return value of the > assignTimestampsAndWatermarks method call if you don't chain > the methods together as it is also shown in [1]. > At least the following example from your first message is > indicating it: > ``` > retractStream.assignTimestampsAndWatermarks(new > BoRetractStreamTimestampAssigner()); (This is a deprecated > method) > // instead of: retractStream = > retractStream.assignTimestampsAndWatermarks(new > BoRetractStreamTimestampAssigner()); > retractStream > .keyBy(<key selector>) > .process(new TableOutputProcessFunction()) > .name("ProcessTableOutput") > .uid("ProcessTableOutput") > .addSink(businessObjectSink) > .name("businessObjectSink") > .uid("businessObjectSink") > .setParallelism(1); > ``` > > For your second question about setting the EventTime I'm > going to pull in Timo from the SDK team as I don't see an > issue with your code right away. > > Best, > Matthias > > [1] > > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$> > > On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li > <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com> > <fuyaoli2...@gmail.com>> wrote: > > Hi Flink Users and Community, > > For the first part of the question, the 12 hour time > difference is caused by a time extraction bug myself. I > can get the time translated correctly now. The type cast > problem does have some workarounds to solve it.. > > My major blocker right now is the onTimer part is not > properly triggered. I guess it is caused by failing to > configure the correct watermarks & timestamp assigners. > Please give me some insights. > > 1. If I don't chain the assignTimestampsAndWatermarks() > method in together with keyedBy().. and process().. > method. The context.timestamp() in my processElement() > function will be null. Is this some expected behavior? > The Flink examples didn't chain it together. (see > example here: > > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*using-watermark-strategies__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JYxd6Sb4$>) > > 2. If I use registerEventTimeTimer() in > processElement(). The onTimer method will not be > triggered. However, I can trigger the onTimer method if > I simply change it to registerProcessingTimeTimer(). I > am using the settings below in the stream env. > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(1000L); > > My code for method the process chain: > retractStream > > .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, > Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) > > .withTimestampAssigner((booleanRowTuple2, timestamp) -> { > Row rowData = > booleanRowTuple2.f1; > LocalDateTime headerTime = > (LocalDateTime)rowData.getField(3); > LocalDateTime linesTime = > (LocalDateTime)rowData.getField(7); > > LocalDateTime > latestDBUpdateTime = null; > if (headerTime != null && > linesTime != null) { > latestDBUpdateTime = > headerTime.isAfter(linesTime) ? headerTime : linesTime; > } > else { > latestDBUpdateTime = > (headerTime != null) ? headerTime : linesTime; > } > if (latestDBUpdateTime != > null) { > return > > latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli(); > } > // In the worst case, we > use system time instead, which should never be reached. > return > System.currentTimeMillis(); > })) > // .assignTimestampsAndWatermarks(new > MyWaterStrategy()) // second way to create watermark, > doesn't work > .keyBy(value -> { > // There could be null fields for > header invoice_id field > String invoice_id_key = > (String)value.f1.getField(0); > if (invoice_id_key == null) { > invoice_id_key = > (String)value.f1.getField(4); > } > return invoice_id_key; > }) > .process(new > TableOutputProcessFunction()) > .name("ProcessTableOutput") > .uid("ProcessTableOutput") > .addSink(businessObjectSink) > .name("businessObjectSink") > .uid("businessObjectSink") > .setParallelism(1); > > Best regards, > Fuyao > > On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li > <fuyaoli2...@gmail.com <mailto:fuyaoli2...@gmail.com> > <fuyaoli2...@gmail.com>> > wrote: > > Hi Flink Community, > > I am doing some research work on Flink Datastream > and Table API and I meet two major problems. I am > using Flink 1.11.2, scala version 2.11, java 8. My > use case looks like this. I plan to write a data > processing pipeline with two stages. My goal is to > construct a business object containing information > from several Kafka streams with a primary key and > emit the complete business object if such primary > key doesn't appear in the pipeline for 10 seconds. > > In the first stage, I first consume three Kafka > streams and transform it to Flink Datastream using a > deserialization schema containing some type and date > format transformation, and then I register these > data streams as Table and do a full outer join one > by one using Table API. I also add query > configuration for this to avoid excessive state. The > primary key is also the join key. > > In the second stage, I transform the joined table to > a retracted stream and put it into > KeyedProcessFunction to generate the business object > if the business object's primary key is inactive for > 10 second. > > Is this way of handling the data the suggested > approach? (I understand I can directly consume kafka > data in Table API. I haven't tried that yet, maybe > that's better?) Any suggestion is welcomed. During > implementing this, I meet two major problems and > several smaller questions under each problem. > > > 1. Some type cast behavior of retracted streams I > can't explain. > > (1) In the initial stage, I registered some field as > *java.sql.Date* or *java.sql.timestamp* following > the examples at > ( > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html*data-type-extraction__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JsB1tdos$>) > > . After join and transform to retracted stream, it > becomes *java.time.LocalDate* and > *java.time.LocalDateTime* instead. > > For example, when first ingesting the Kafka streams, > I registerd a attribute in java.sql.Timestamp type. > > @JsonAlias("ATTRIBUTE1") > private @DataTypeHint(value = "TIMESTAMP(6)", > bridgedTo = java.sql.Timestamp.class) Timestamp > ATTRIBUTE1; > > When I tried to cast the type information back after > the retracted stream, the code gives me error > information below. > > java.lang.ClassCastException: > java.time.LocalDateTime cannot be cast to > java.sql.Timestamp > > Maybe I should use toAppendStream instead since > append stream could register type information, but > toRetractedStream can't do that? > ( > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html*convert-a-table-into-a-datastream-or-dataset__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JQ99YqY0$>) > > > My work around is to cast it to LocalDateTime first > and extract the epoch time, this doesn't seem to be > a final solution. > > (2) During timestamp conversion, the Flink to > retracted stream seems to lost the AM/PM information > in the stream and causing a 12 hour difference if it > is PM. > > I use joda time to do some timestamp conversion in > the first deserialization stage, my pattern looks > like this. "a" means AM/PM information > > DateTimeFormatter format3 = > DateTimeFormat.forPattern("dd-MMM-yy HH.mm.ss.SSSSSS > a").withZone(DateTimeZone.getDefault()); > > After the retracted stream, the AM/PM information is > not preserved. > > > 2. My onTimer method in KeyedProcessFunction can not > be triggered when I scheduled a event timer timer. > > I am using event time in my code. I am new to > configure watermarks and I might miss something to > configure it correctly. I also tried to register a > processing time, it could enter and produce some > results. > > I am trying to follow the example here: > > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html*example__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JNMi_YMc$> > > My onTimer method looks like this and the scheduled > event doesn't happen.. > > In processElement(): > > > context.timerService().registerEventTimeTimer(current.getLastModifiedTime() > + 10000); > > My onTimer function > > @Override > public void onTimer(long timestamp, > OnTimerContext ctx, Collector<BusinessObject> > collector) throws Exception { > TestBusinessObjectState result = > testBusinessObjectState.value(); > log.info > <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$ > > > <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>("Inside > onTimer Method, > current key: {}, timestamp: {}, last modified time: > {}", ctx.getCurrentKey(), timestamp, > result.getLastModifiedTime()); > > // check if this is an outdated timer or > the latest timer > if (timestamp >= > result.getLastModifiedTime() + 10000) { > // emit the state on timeout > log.info > <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$ > > > <https://urldefense.com/v3/__http://log.info/__;!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JSt0BaYQ$>("Collecting > a business > object, {}", result.getBusinessObject().toString()); > > collector.collect(result.getBusinessObject()); > > cleanUp(ctx); > } > } > > private void cleanUp(Context ctx) throws > Exception { > Long timer = > testBusinessObjectState.value().getLastModifiedTime(); > > ctx.timerService().deleteEventTimeTimer(timer); > testBusinessObjectState.clear(); > } > > > (1) When I assign the timestamp and watermarks > outside the process() method chain. The > "context.timestamp()" will be null. If I put it > inside the chain, it won't be null. Is this the > expected behavior? In the null case, the strange > thing is that, surprisingly, I can collect the > business object immediately without a designed 10 > second waiting time... This shouldn't happen, > right...? The processing timer also seems to work. > The code can enter the on timer method. > > retractStream.assignTimestampsAndWatermarks(new > BoRetractStreamTimestampAssigner()); (This is a > deprecated method) > > retractStream > .keyBy(<key selector>) > .process(new TableOutputProcessFunction()) > .name("ProcessTableOutput") > .uid("ProcessTableOutput") > .addSink(businessObjectSink) > .name("businessObjectSink") > .uid("businessObjectSink") > .setParallelism(1); > > (2) For watermarks configuration. I use an field in > the retracted stream as the event time. This time is > usually 15-20 seconds before current time. > > In my environment, I have done some settings for > streaming env based on information here( > > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>). > > My event doesn't always come, so I think I need to > set auto watermark interval to let the event timer > on timer works correctly. I have added the code below. > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(1000L); > > 1> Which kind of watermark strategy should I use? > General BoundOutofOrderness or Watermark generator? > > I tried to write a Watermark generator and I just > don't how to apply it to the stream correctly. The > documentation doesn't explain very clearly. My code > looks like below and it doesn't work. > > assign part: > > > .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean, > Row>>) context -> new > TableBoundOutofOrdernessGenerator())) > > watermark generater: > > I just assign the event time attribute following the > example in the doc. > ( > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$ > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$ > > > <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JqRHnniA$>) > > > 2> I also tried to use the static method in Water > Strategy. The syntax is correct, but I meet the same > problem in 2.(1). > > > .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, > Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15)) > > .withTimestampAssigner((booleanRowTuple2, timestamp) > -> { > <Select a event time > attribute in the booleanRowTuple2> > })) > > > (3) For the retracted datastream, do I need to > explicitly attach it to the stream environment? I > think it is done by default, right? Just want to > confirm it. I do have the env.execute() at the end > of the code. > > I understand this is a lot of questions, thanks a > lot for your patience to look through my email! If > there is anything unclear, please reach out to me. > Thanks! > > > Best regards, > > Fuyao Li > > >