Hi Matthias, Just to provide more context on this problem. I only have 1 partition per each Kafka Topic at the beginning before the join operation. After reading the doc: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Maybe that is the root cause of my problem here, with less than 8 partitions (only 1 partition in my case), using the default parallelism of 8 will cause this wrong behavior. This is my guess, it takes a while to test it out... What's your opinion on this? Thanks! Best, Fuyao On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <fuyaoli2...@gmail.com> wrote: > Hi Matthias, > > One more question regarding Flink table parallelism, is it possible to > configure the parallelism for Table operation at operator level, it seems > we don't have such API available, right? Thanks! > > Best, > Fuyao > > On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li <fuyaoli2...@gmail.com> wrote: > >> Hi Matthias, >> >> Thanks for your information. I have managed to figure out the first issue >> you mentioned. Regarding the second issue. I have got some progress on it. >> >> I have sent another email with the title 'BoundedOutOfOrderness Watermark >> Generator is NOT making the event time to advance' using another email of >> mine, fuyao...@oracle.com. That email contains some more context on my >> issue. Please take a look. I have made some progress after sending that new >> email. >> >> Previously, I had managed to make timelag watermark strategy working in >> my code, but my bound out of orderness strategy or punctuated watermark >> strategy doesn't work well. It produces 8 watermarks each time. Two cycles >> are shown below. >> >> I managed to figure out the root cause is that Flink stream execution >> environment has a default parallelism as 8.* I didn't notice in the doc, >> could the Community add this explicitly into the official doc to avoid some >> confusion? Thanks.* >> >> From my understanding, the watermark advances based on the lowest >> watermark among the 8, so I can not advance the bound out of orderness >> watermark since I am only advancing 1 of the 8 parallelisms. If I set the >> entire stream execution environment to be of parallelism 1, it will reflect >> the watermark in the context correctly. One more thing is that this >> behavior is not reflected in the Flink Cluster web UI interface. I can see >> the watermark is advancing, but it is not in reality. *That's causing >> the inconsistency problem I mentioned in the other email I mentioned above. >> Will this be considered as a bug in the UI?* >> >> My current question is, since I have full outer join operation before the >> KeyedProcessFunction here. How can I let the bound of orderness watermark / >> punctuated watermark strategy work if the parallelism > 1? It can only >> update one of the 8 parallelisms for the watermark for this onTimer >> operator. Is this related to my Table full outer join operation before this >> step? According to the doc, >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism >> >> Default parallelism should be the same like the stream environment. Why >> can't I update the watermarks for all 8 parallelisms? What should I do to >> enable this function with Parallelism larger than 1? Thanks. >> >> First round: (Note the first column of each log row is the timelag >> strategy, it is getting updated correctly for all 8 parallelism, but the >> other two strategies I mentioned above can't do that..) >> >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266199, >> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: >> 1605047187881 (only one of the 8 parallelism for bound out of orderness is >> getting my new watermark) >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266199, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:01,199 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047266198, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> >> Second round: (I set the autoWatermark interval to be 5 seconds) >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> 14:28:06,200 INFO >> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit >> Watermark: watermark based on system time: 1605047271200, >> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 >> >> >> Best regards, >> >> Fuyao >> >> On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> Hi Fuyao, >>> for your first question about the different behavior depending on >>> whether you chain the methods or not: Keep in mind that you have to save >>> the return value of the assignTimestampsAndWatermarks method call if you >>> don't chain the methods together as it is also shown in [1]. >>> At least the following example from your first message is indicating it: >>> ``` >>> retractStream.assignTimestampsAndWatermarks(new >>> BoRetractStreamTimestampAssigner()); (This is a deprecated method) >>> // instead of: retractStream = >>> retractStream.assignTimestampsAndWatermarks(new >>> BoRetractStreamTimestampAssigner()); >>> retractStream >>> .keyBy(<key selector>) >>> .process(new TableOutputProcessFunction()) >>> .name("ProcessTableOutput") >>> .uid("ProcessTableOutput") >>> .addSink(businessObjectSink) >>> .name("businessObjectSink") >>> .uid("businessObjectSink") >>> .setParallelism(1); >>> ``` >>> >>> For your second question about setting the EventTime I'm going to pull >>> in Timo from the SDK team as I don't see an issue with your code right away. >>> >>> Best, >>> Matthias >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies >>> >>> On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li <fuyaoli2...@gmail.com> wrote: >>> >>>> Hi Flink Users and Community, >>>> >>>> For the first part of the question, the 12 hour time difference is >>>> caused by a time extraction bug myself. I can get the time translated >>>> correctly now. The type cast problem does have some workarounds to solve >>>> it.. >>>> >>>> My major blocker right now is the onTimer part is not properly >>>> triggered. I guess it is caused by failing to configure the correct >>>> watermarks & timestamp assigners. Please give me some insights. >>>> >>>> 1. If I don't chain the assignTimestampsAndWatermarks() method in >>>> together with keyedBy().. and process().. method. The context.timestamp() >>>> in my processElement() function will be null. Is this some expected >>>> behavior? The Flink examples didn't chain it together. (see example here: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies >>>> ) >>>> 2. If I use registerEventTimeTimer() in processElement(). The onTimer >>>> method will not be triggered. However, I can trigger the onTimer method if >>>> I simply change it to registerProcessingTimeTimer(). I am using the >>>> settings below in the stream env. >>>> >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> env.getConfig().setAutoWatermarkInterval(1000L); >>>> >>>> My code for method the process chain: >>>> retractStream >>>> >>>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, >>>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) >>>> .withTimestampAssigner((booleanRowTuple2, >>>> timestamp) -> { >>>> Row rowData = booleanRowTuple2.f1; >>>> LocalDateTime headerTime = >>>> (LocalDateTime)rowData.getField(3); >>>> LocalDateTime linesTime = >>>> (LocalDateTime)rowData.getField(7); >>>> >>>> LocalDateTime latestDBUpdateTime = null; >>>> if (headerTime != null && linesTime != >>>> null) { >>>> latestDBUpdateTime = >>>> headerTime.isAfter(linesTime) ? headerTime : linesTime; >>>> } >>>> else { >>>> latestDBUpdateTime = (headerTime != >>>> null) ? headerTime : linesTime; >>>> } >>>> if (latestDBUpdateTime != null) { >>>> return >>>> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli(); >>>> } >>>> // In the worst case, we use system time >>>> instead, which should never be reached. >>>> return System.currentTimeMillis(); >>>> })) >>>> // .assignTimestampsAndWatermarks(new MyWaterStrategy()) >>>> // second way to create watermark, doesn't work >>>> .keyBy(value -> { >>>> // There could be null fields for header invoice_id >>>> field >>>> String invoice_id_key = >>>> (String)value.f1.getField(0); >>>> if (invoice_id_key == null) { >>>> invoice_id_key = (String)value.f1.getField(4); >>>> } >>>> return invoice_id_key; >>>> }) >>>> .process(new TableOutputProcessFunction()) >>>> .name("ProcessTableOutput") >>>> .uid("ProcessTableOutput") >>>> .addSink(businessObjectSink) >>>> .name("businessObjectSink") >>>> .uid("businessObjectSink") >>>> .setParallelism(1); >>>> >>>> Best regards, >>>> Fuyao >>>> >>>> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li <fuyaoli2...@gmail.com> wrote: >>>> >>>>> Hi Flink Community, >>>>> >>>>> I am doing some research work on Flink Datastream and Table API and I >>>>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java >>>>> 8. My use case looks like this. I plan to write a data processing pipeline >>>>> with two stages. My goal is to construct a business object containing >>>>> information from several Kafka streams with a primary key and emit the >>>>> complete business object if such primary key doesn't appear in the >>>>> pipeline for 10 seconds. >>>>> >>>>> In the first stage, I first consume three Kafka streams and transform >>>>> it to Flink Datastream using a deserialization schema containing some type >>>>> and date format transformation, and then I register these data streams as >>>>> Table and do a full outer join one by one using Table API. I also add >>>>> query >>>>> configuration for this to avoid excessive state. The primary key is also >>>>> the join key. >>>>> >>>>> In the second stage, I transform the joined table to a retracted >>>>> stream and put it into KeyedProcessFunction to generate the business >>>>> object >>>>> if the business object's primary key is inactive for 10 second. >>>>> >>>>> Is this way of handling the data the suggested approach? (I understand >>>>> I can directly consume kafka data in Table API. I haven't tried that yet, >>>>> maybe that's better?) Any suggestion is welcomed. During implementing >>>>> this, >>>>> I meet two major problems and several smaller questions under each >>>>> problem. >>>>> >>>>> >>>>> 1. Some type cast behavior of retracted streams I can't explain. >>>>> >>>>> (1) In the initial stage, I registered some field as *java.sql.Date* >>>>> or *java.sql.timestamp* following the examples at ( >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction) >>>>> . After join and transform to retracted stream, it becomes >>>>> *java.time.LocalDate* and *java.time.LocalDateTime* instead. >>>>> >>>>> For example, when first ingesting the Kafka streams, I registerd a >>>>> attribute in java.sql.Timestamp type. >>>>> >>>>> @JsonAlias("ATTRIBUTE1") >>>>> private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo = >>>>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1; >>>>> >>>>> When I tried to cast the type information back after the retracted >>>>> stream, the code gives me error information below. >>>>> >>>>> java.lang.ClassCastException: java.time.LocalDateTime cannot be cast >>>>> to java.sql.Timestamp >>>>> >>>>> Maybe I should use toAppendStream instead since append stream could >>>>> register type information, but toRetractedStream can't do that? ( >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset >>>>> ) >>>>> >>>>> My work around is to cast it to LocalDateTime first and extract the >>>>> epoch time, this doesn't seem to be a final solution. >>>>> >>>>> (2) During timestamp conversion, the Flink to retracted stream seems >>>>> to lost the AM/PM information in the stream and causing a 12 hour >>>>> difference if it is PM. >>>>> >>>>> I use joda time to do some timestamp conversion in the first >>>>> deserialization stage, my pattern looks like this. "a" means AM/PM >>>>> information >>>>> >>>>> DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy >>>>> HH.mm.ss.SSSSSS a").withZone(DateTimeZone.getDefault()); >>>>> >>>>> After the retracted stream, the AM/PM information is not preserved. >>>>> >>>>> >>>>> 2. My onTimer method in KeyedProcessFunction can not be triggered when >>>>> I scheduled a event timer timer. >>>>> >>>>> I am using event time in my code. I am new to configure watermarks and >>>>> I might miss something to configure it correctly. I also tried to register >>>>> a processing time, it could enter and produce some results. >>>>> >>>>> I am trying to follow the example here: >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example >>>>> >>>>> My onTimer method looks like this and the scheduled event doesn't >>>>> happen.. >>>>> >>>>> In processElement(): >>>>> >>>>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime() >>>>> + 10000); >>>>> >>>>> My onTimer function >>>>> >>>>> @Override >>>>> public void onTimer(long timestamp, OnTimerContext ctx, >>>>> Collector<BusinessObject> collector) throws Exception { >>>>> TestBusinessObjectState result = >>>>> testBusinessObjectState.value(); >>>>> log.info("Inside onTimer Method, current key: {}, timestamp: >>>>> {}, last modified time: {}", ctx.getCurrentKey(), timestamp, >>>>> result.getLastModifiedTime()); >>>>> >>>>> // check if this is an outdated timer or the latest timer >>>>> if (timestamp >= result.getLastModifiedTime() + 10000) { >>>>> // emit the state on timeout >>>>> log.info("Collecting a business object, {}", >>>>> result.getBusinessObject().toString()); >>>>> collector.collect(result.getBusinessObject()); >>>>> >>>>> cleanUp(ctx); >>>>> } >>>>> } >>>>> >>>>> private void cleanUp(Context ctx) throws Exception { >>>>> Long timer = >>>>> testBusinessObjectState.value().getLastModifiedTime(); >>>>> ctx.timerService().deleteEventTimeTimer(timer); >>>>> testBusinessObjectState.clear(); >>>>> } >>>>> >>>>> >>>>> (1) When I assign the timestamp and watermarks outside the process() >>>>> method chain. The "context.timestamp()" will be null. If I put it inside >>>>> the chain, it won't be null. Is this the expected behavior? In the null >>>>> case, the strange thing is that, surprisingly, I can collect the business >>>>> object immediately without a designed 10 second waiting time... This >>>>> shouldn't happen, right...? The processing timer also seems to work. The >>>>> code can enter the on timer method. >>>>> >>>>> retractStream.assignTimestampsAndWatermarks(new >>>>> BoRetractStreamTimestampAssigner()); (This is a deprecated method) >>>>> >>>>> retractStream >>>>> .keyBy(<key selector>) >>>>> .process(new TableOutputProcessFunction()) >>>>> .name("ProcessTableOutput") >>>>> .uid("ProcessTableOutput") >>>>> .addSink(businessObjectSink) >>>>> .name("businessObjectSink") >>>>> .uid("businessObjectSink") >>>>> .setParallelism(1); >>>>> >>>>> (2) For watermarks configuration. I use an field in the retracted >>>>> stream as the event time. This time is usually 15-20 seconds before >>>>> current >>>>> time. >>>>> >>>>> In my environment, I have done some settings for streaming env based >>>>> on information here( >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator). >>>>> My event doesn't always come, so I think I need to set auto watermark >>>>> interval to let the event timer on timer works correctly. I have added the >>>>> code below. >>>>> >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> env.getConfig().setAutoWatermarkInterval(1000L); >>>>> >>>>> 1> Which kind of watermark strategy should I use? General >>>>> BoundOutofOrderness or Watermark generator? >>>>> >>>>> I tried to write a Watermark generator and I just don't how to apply >>>>> it to the stream correctly. The documentation doesn't explain very >>>>> clearly. >>>>> My code looks like below and it doesn't work. >>>>> >>>>> assign part: >>>>> >>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier<Tuple2<Boolean, >>>>> Row>>) context -> new TableBoundOutofOrdernessGenerator())) >>>>> >>>>> watermark generater: >>>>> >>>>> I just assign the event time attribute following the example in the >>>>> doc. ( >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator >>>>> ) >>>>> >>>>> 2> I also tried to use the static method in Water Strategy. The syntax >>>>> is correct, but I meet the same problem in 2.(1). >>>>> >>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Boolean, >>>>> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(15)) >>>>> .withTimestampAssigner((booleanRowTuple2, >>>>> timestamp) -> { >>>>> <Select a event time attribute in the >>>>> booleanRowTuple2> >>>>> })) >>>>> >>>>> >>>>> (3) For the retracted datastream, do I need to explicitly attach it to >>>>> the stream environment? I think it is done by default, right? Just want to >>>>> confirm it. I do have the env.execute() at the end of the code. >>>>> >>>>> I understand this is a lot of questions, thanks a lot for >>>>> your patience to look through my email! If there is anything unclear, >>>>> please reach out to me. Thanks! >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> Fuyao Li >>>>> >>>>