Re: Re: [DISCUSS] Removing deprecated methods from DataStream API
+1 for removing the methods that are deprecated for a while & have alternative methods. One specific thing is that if we remove the DataStream#split, do we consider enabling side-output in more operators in the future ? Currently it should be only available in ProcessFunctions, but not available to other commonly used UDF like Source or AsyncFunction[1]. One temporary solution occurs to me is to add a ProcessFunction after the operators want to use side-output. But I think the solution is not very direct to come up with and if it really works we might add it to the document of side-output. [1] https://issues.apache.org/jira/browse/FLINK-7954 Best, Yun --Original Mail -- Sender:Kostas Kloudas Send Date:Tue Aug 18 03:52:44 2020 Recipients:Dawid Wysakowicz CC:dev , user Subject:Re: [DISCUSS] Removing deprecated methods from DataStream API +1 for removing them. From a quick look, most of them (not all) have been deprecated a long time ago. Cheers, Kostas On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz wrote: > > @David Yes, my idea was to remove any use of fold method and all related > classes including WindowedStream#fold > > @Klou Good idea to also remove the deprecated enableCheckpointing() & > StreamExecutionEnvironment#readFile and alike. I did another pass over some > of the classes and thought we could also drop: > > ExecutionConfig#set/getCodeAnalysisMode > ExecutionConfig#disable/enableSysoutLogging > ExecutionConfig#set/isFailTaskOnCheckpointError > ExecutionConfig#isLatencyTrackingEnabled > > As for the `forceCheckpointing` I am not fully convinced to doing it. As far > as I know iterations still do not participate in checkpointing correctly. > Therefore it still might make sense to force it. In other words there is no > real alternative to that method. Unless we only remove the methods from > StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. > WDYT? > > An updated list of methods I suggest to remove: > > ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9) > ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10) > ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9) > ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7) > ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?) > StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...) > (deprecated in 1.2) > RuntimeContext#getAllAccumulators (deprecated in 0.10) > DataStream#fold and all related classes and methods such as FoldFunction, > FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4) > StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated > in 1.5) > DataStream#split (deprecated in 1.8) > Methods in (Connected)DataStream that specify keys as either indices or field > names such as DataStream#keyBy, DataStream#partitionCustom, > ConnectedStream#keyBy, (deprecated in 1.11) > > Bear in mind that majority of the options listed above in ExecutionConfig > take no effect. They were left there purely to satisfy the binary > compatibility. Personally I don't see any benefit of leaving a method and > silently dropping the underlying feature. The only configuration that is > respected is setting the number of execution retries. > > I also wanted to make it explicit that most of the changes above would result > in a binary incompatibility and require additional exclusions in the japicmp. > Those are: > > ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9) > ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10) > ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9) > ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7) > ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?) > DataStream#fold and all related classes and methods such as FoldFunction, > FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4) > DataStream#split (deprecated in 1.8) > Methods in (Connected)DataStream that specify keys as either indices or field > names such as DataStream#keyBy, DataStream#partitionCustom, > ConnectedStream#keyBy, (deprecated in 1.11) > StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...) > (deprecated in 1.2) > > Looking forward to more opinions on the issue. > > Best, > > Dawid > > > On 17/08/2020 12:49, Kostas Kloudas wrote: > > Thanks a lot for starting this Dawid, > > Big +1 for the proposed clean-up, and I would also add the deprecated > methods of the StreamExecutionEnvironment like: > > enableCheckpointing(long interval, CheckpointingMode mode, boolean force) > enableCheckpointing() > isForceCheckpointing() > > readFile(FileInputFormat inputFormat,String > filePath,FileProcessingMode watchType,long interval, FilePathFilter > fi
Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hi, Very thanks for bringing up this discussion! One more question is that does the BATCH and STREAMING mode also decides the shuffle types and operators? I'm asking so because that even for blocking mode, it should also benefit from keeping some edges to be pipeline if the resources are known to be enough. Do we also consider to expose more fine-grained control on the shuffle types? Best, Yun --Original Mail -- Sender:Kostas Kloudas Send Date:Tue Aug 18 02:24:21 2020 Recipients:David Anderson CC:dev , user Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input Hi Kurt and David, Thanks a lot for the insightful feedback! @Kurt: For the topic of checkpointing with Batch Scheduling, I totally agree with you that it requires a lot more work and careful thinking on the semantics. This FLIP was written under the assumption that if the user wants to have checkpoints on bounded input, he/she will have to go with STREAMING as the scheduling mode. Checkpointing for BATCH can be handled as a separate topic in the future. In the case of MIXED workloads and for this FLIP, the scheduling mode should be set to STREAMING. That is why the AUTOMATIC option sets scheduling to BATCH only if all the sources are bounded. I am not sure what are the plans there at the scheduling level, as one could imagine in the future that in mixed workloads, we schedule first all the bounded subgraphs in BATCH mode and we allow only one UNBOUNDED subgraph per application, which is going to be scheduled after all Bounded ones have finished. Essentially the bounded subgraphs will be used to bootstrap the unbounded one. But, I am not aware of any plans towards that direction. @David: The processing time timer handling is a topic that has also been discussed in the community in the past, and I do not remember any final conclusion unfortunately. In the current context and for bounded input, we chose to favor reproducibility of the result, as this is expected in batch processing where the whole input is available in advance. This is why this proposal suggests to not allow processing time timers. But I understand your argument that the user may want to be able to run the same pipeline on batch and streaming this is why we added the two options under future work, namely (from the FLIP): ``` Future Work: In the future we may consider adding as options the capability of: * firing all the registered processing time timers at the end of a job (at close()) or, * ignoring all the registered processing time timers at the end of a job. ``` Conceptually, we are essentially saying that we assume that batch execution is assumed to be instantaneous and refers to a single "point" in time and any processing-time timers for the future may fire at the end of execution or be ignored (but not throw an exception). I could also see ignoring the timers in batch as the default, if this makes more sense. By the way, do you have any usecases in mind that will help us better shape our processing time timer handling? Kostas On Mon, Aug 17, 2020 at 2:52 PM David Anderson wrote: > > Kostas, > > I'm pleased to see some concrete details in this FLIP. > > I wonder if the current proposal goes far enough in the direction of > recognizing the need some users may have for "batch" and "bounded streaming" > to be treated differently. If I've understood it correctly, the section on > scheduling allows me to choose STREAMING scheduling even if I have bounded > sources. I like that approach, because it recognizes that even though I have > bounded inputs, I don't necessarily want batch processing semantics. I think > it makes sense to extend this idea to processing time support as well. > > My thinking is that sometimes in development and testing it's reasonable to > run exactly the same job as in production, except with different sources and > sinks. While it might be a reasonable default, I'm not convinced that > switching a processing time streaming job to read from a bounded source > should always cause it to fail. > > David > > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas wrote: >> >> Hi all, >> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet >> API in favour of the DataStream API and the Table API. After this work >> is done, the user will be able to write a program using the DataStream >> API and this will execute efficiently on both bounded and unbounded >> data. But before we reach this point, it is worth discussing and >> agreeing on the semantics of some operations as we transition from the >> streaming world to the batch one. >> >> This thread and the associated FLIP [2] aim at discussing these issues >> as these topics are pretty important to users and can lead to >> unpleasant surprises if we do not pay attention. >> >> Let's have a healthy discussion here and I will be updating the FLIP >> accordingly. >> >> Cheers, >> Kostas >> >> [1] >> https://cwik
Re: SDK vs Connectors
Hi Prasanna, 1) Semantically both a) and b) would be Ok. If the Custom sink could be chained with the map operator (I assume the map operator is the "Processing" in the graph), there should be also no much difference physically, if they could not chain, then writting a custom sink would cause another pass of network transferring, but the custom sink would be run in a different thread, thus much more computation resources could be exploited. 2) To achieve at-least-once, you need to implment the "CheckpointedFunction" interface, and ensures flushing all the data to the outside systems when snapshotting states. Since if the checkpointing succeed, the previous data will not be replayed after failover, thus these pieces of data need to be ensured written out before the checkpoint succeeds. 3) From my side I don't think there are significant disadvantages of writing custom sink functions. Best, Yun -- Sender:Prasanna kumar Date:2020/08/22 02:00:51 Recipient:user; Theme:SDK vs Connectors Hi Team, Following is the pipeline Kafka => Processing => SNS Topics . Flink Does not provide a SNS connector out of the box. a) I implemented the above by using AWS SDK and published the messages in the Map operator itself. The pipeline is working well. I see messages flowing to SNS topics. b) Another approach is that I could write a custom sink function and still publish to SNS using SDK in this stage. Questions 1) What would be the primary difference between approach a) and b). Is there any significant advantage of one over the other ? 2) Would at least once guarantee be confirmed if we follow the above approach? 3) Would there be any significant disadvantages(rather what we need to be careful ) of writing our custom sink functions ? Thanks, Prasanna.
Re: Re: [ANNOUNCE] New PMC member: Dian Fu
Congratulations Dian ! Best Yun -- Sender:Marta Paes Moreira Date:2020/08/27 17:42:34 Recipient:Yuan Mei Cc:Xingbo Huang; jincheng sun; dev; Dian Fu; user; user-zh Theme:Re: [ANNOUNCE] New PMC member: Dian Fu Congrats, Dian! On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei wrote: Congrats! On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang wrote: Congratulations Dian! Best, Xingbo jincheng sun 于2020年8月27日周四 下午5:24写道: Hi all, On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of the Apache Flink Project Management Committee (PMC). Dian Fu has been very active on PyFlink component, working on various important features, such as the Python UDF and Pandas integration, and keeps checking and voting for our releases, and also has successfully produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the release of Flink 1.12. Please join me in congratulating Dian Fu for becoming a Flink PMC Member! Best, Jincheng(on behalf of the Flink PMC)
Re: Implementation of setBufferTimeout(timeoutMillis)
Hi Pankaj, I think it should be in org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher. Best, Yun -- Sender:Pankaj Chand Date:2020/08/31 02:40:15 Recipient:user Theme:Implementation of setBufferTimeout(timeoutMillis) Hello, The documentation gives the following two sample lines for setting the buffer timeout for the streaming environment or transformation. env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); I have been trying to find where (file and method) in the Flink source code are the buffers being flushed by iteratively referring to the value of timeoutMillis (or the default value), but have been unsuccessful. Please help. Thanks, Pankaj
Re: Exception on s3 committer
Hi Ivan, I think there might be some points to check: 1. Is the job restored from the latest successful checkpoint after restart ? 2. Have you ever changed the timeout settings for uncompleted multipart upload ? 3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 exist or not ? Best, Yun --Original Mail -- Sender:Ivan Yang Send Date:Sat Aug 29 12:43:28 2020 Recipients:user Subject:Exception on s3 committer Hi all, We got this exception after a job restart. Does anyone know what may lead to this situation? and how to get pass this Checkpoint issue? Prior to this, the job failed due to “Checkpoint expired before completing.” We are s3 heavy, writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat to various s3 prefixes. Thanks in advance. -Ivan 2020-08-28 15:17:58 java.io.IOException: Recovering commit failed for object cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does not exist and MultiPart Upload 3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw- is not valid. at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: Completing multipart commit on cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 9A99AFAD80A8F202; S3 Extended Request ID: fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=), S3 Extended Request ID: fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=:NoSuchUpload at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUplo
Re: Re: Exception on s3 committer
Hi Ivan, For flink sink it might commit a single file multiple times. This happens if there are failover after committing one file and then met with a failover, the the job will restarted from the latest checkpoint, and the file's state will be get back to pending and committed again. In the normal case, when re-committing a file failed Flink will first check if the file exists, if it exists, it will think it as committed before and ignore the error and continue. However, based on the current description, it seems that there is another job which will delete the committed files. Then if it is deleted after the first commit but before recommitting the file, the file will be detected to be not exists when recomming and thus will cause an error. --Original Mail -- Sender:Ivan Yang Send Date:Tue Sep 1 08:06:55 2020 Recipients:Yun Gao CC:user Subject:Re: Exception on s3 committer Hi Yun, Thank you so much for you suggestion. (1) The job couldn’t restore from the last checkpoint. The exception is in my original email. (2) No, I didn’t change any multipart upload settings. (3) The file is gone. I have another batch process that reads Flink output s3 bucket and pushes objects to another bucket. Upon success read and write, The batch job will delete the file. What’s puzzling me is if Flink hasn’t successfully commit the multipart file, it should not be visible to the batch job. It looks the situation is while Flink tried to commit the multipart file, it crashed and restarted. The file is committed on s3 successfully, but not acknowledge recorded on Flink side. In between, the batch job consumed the file. I don’t know if that’s possible. Thanks Ivan On Aug 30, 2020, at 11:10 PM, Yun Gao wrote: Hi Ivan, I think there might be some points to check: 1. Is the job restored from the latest successful checkpoint after restart ? 2. Have you ever changed the timeout settings for uncompleted multipart upload ? 3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 exist or not ? Best, Yun --Original Mail -- Sender:Ivan Yang Send Date:Sat Aug 29 12:43:28 2020 Recipients:user Subject:Exception on s3 committer Hi all, We got this exception after a job restart. Does anyone know what may lead to this situation? and how to get pass this Checkpoint issue? Prior to this, the job failed due to “Checkpoint expired before completing.” We are s3 heavy, writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat to various s3 prefixes. Thanks in advance. -Ivan 2020-08-28 15:17:58 java.io.IOException: Recovering commit failed for object cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does not exist and MultiPart Upload 3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw- is not valid. at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released
Great! Very thanks @ZhuZhu for driving this and thanks for all contributed to the release! Best, Yun --Original Mail -- Sender:Jingsong Li Send Date:Thu Sep 17 13:31:41 2020 Recipients:user-zh CC:dev , user , Apache Announce List Subject:Re: [ANNOUNCE] Apache Flink 1.11.2 released Thanks ZhuZhu for driving the release. Best, Jingsong On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu wrote: The Apache Flink community is very happy to announce the release of Apache Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11 series. Apache Flink(r) is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2020/09/17/release-1.11.2.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575 We would like to thank all contributors of the Apache Flink community who made this release possible! Thanks, Zhu -- Best, Jingsong Lee
[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users Very sorry for the spoiled formats, I resent the discussion as follows. As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: 1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished --Original Mail -- Sender:Yun Gao Send Date:Fri Oct 9 14:16:52 2020 Recipients:Flink Dev , User-Flink Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi, devs & users As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. Best, Yun -- From:Arvid Heise Send Time:2020 Oct. 12 (Mon.) 15:38 To:Yun Gao Cc:Flink Dev ; User-Flink Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again. A couple of comments: 1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously. 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator. 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already. 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added. 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state. 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place. [1] https://issues.apache.org/jira/browse/FLINK-2491 On Fri, Oct 9, 2020 at 8:22 AM Yun Gao wrote: Hi, devs & users Very sorry for the spoiled formats, I resent the discussion as follows. As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: 1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished --Original Mail -- Sender:Yun Gao Send Date:Fri Oct 9 14:16:52 2020 Recipients:Flink Dev , User-Flink Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finishe
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Arvid, Very thanks for the comments! >>> 4) Yes, the interaction is not trivial and also I have not completely >>> thought it through. But in general, I'm currently at the point where I >>> think that we also need non-checkpoint related events in unaligned >>> checkpoints. So just keep that in mind, that we might converge anyhow at >>> this point. I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. >>> In general, what is helping in this case is to remember that there no >>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we >>> can completely ignore the problem on how to store and restore output >>> buffers of a completed task (also important for the next point). Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot. >>> 5) I think we are on the same page and I completely agree that for the >>> MVP/first version, it's completely fine to start and immediately stop. A >>> tad better would be even to not even start the procession loop. I also agree with this part. We would keep optimizing the implementation after the first version. Best, Yun -- From:Arvid Heise Send Time:2020 Oct. 13 (Tue.) 03:39 To:Yun Gao Cc:Flink Dev ; User-Flink Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point. In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point). 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop. On Mon, Oct 12, 2020 at 6:18 PM Yun Gao wrote: Hi Arvid, Very thanks for the insightful comments! I added the responses for this issue under the quota: >> 1) You call the tasks that get the barriers injected leaf nodes, which would >> make the > sinks the root nodes. That is very similar to how graphs in >> relational algebra are labeled. However, I got the feeling that in Flink, we >> rather iterate from sources to sink, making the sources root nodes and the >> sinks the leaf nodes. However, I have no clue how it's done in similar >> cases, so please take that hint cautiously. >> 2) I'd make the algorithm to find the subtasks iterative and react in >> CheckpointCoordinator. Let's assume that we inject the barrier at all root >> subtasks (initially all sources). So in the iterative algorithm, whenever >> root A finishes, it looks at all connected subtasks B if they have any >> upstream task left. If not B becomes a new root. That would require to only >> touch a part of the job graph, but would require some callback from >> JobManager to CheckpointCoordinator. I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes". >> 2b) We also need to be careful for out-of-sync updates: if the root is about >> to finish, we could send the barrier to it from CheckpointCoordinator, but >> at the time it arrives, the subtask is finished already. Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger. >> 3) An implied change is that checkpoints are not aborted anymore at >> EndOfPartition, which is good, but might be explicitly added. Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior. >> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit >> ambiguous: What happens when an unaligned checkpoint is started and then one >> input channel contains the EndOfPartition e
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Till, Very thanks for the feedbacks ! > 1) When restarting all tasks independent of the status at checkpoint time > (finished, running, scheduled), we might allocate more resources than we > actually need to run the remaining job. From a scheduling perspective it > would be easier if we already know that certain subtasks don't need to be > rescheduled. I believe this can be an optimization, though. > 2) In the section Compatibility, Deprecation and Migration Plan you mentioned > that you want to record operators in the CompletedCheckpoint which are fully > finished. How will this information be used for constructing a recovered > ExecutionGraph? Why wouldn't the same principle work for the task level? I think the first two issues should be related. The main reason that with external checkpoints the checkpoint might taken from one job and used in another jobs, but we do not have a unique ID to match tasks across jobs. Furthermore, users may also change the parallelism of JobVertex, or even modify the graph structures by adding/removing operators or changing the chain relationship between operators. On the other side, currently Flink already provides custom UID for operators, which makes the operators a stable unit for recovery. The current checkpoints are also organized in the unit of operators to support rescale and job Upgrading. When restarting from a checkpoint with finished operators, we could only starts the tasks with operators that are not fully finished (namely some subtasks are still running when taking checkpoints). Then during the execution of a single task, we only initialize/open/run/close the operators not fully finished. The Scheduler should be able to compute if a tasks contains not fully finished operators with the current JobGraph and the operator finish states restored from the checkpoints. > 3) How will checkpointing work together with fully bounded jobs and FLIP-1 > (fine grained recovery)? Currently I think it should be compatible with fully bounded jobs and FLIP-1 since it could be viewed as a completion of the current checkpoint mechanism. Concretely 1. The batch job (with blocking execution mode) should be not affected since checkpoints are not enabled in this case. 2. The bounded job running with pipeline mode would be also supported with checkpoints during it is finishing with the modification. As discussed in the FLIP it should not affect the current behavior after restored for almost all the jobs. 3. The region failover and more fine-grained tasks should also not be affected: similar to the previous behavior, after failover, the failover policy (full/region/fine-grained) decides which tasks to restart and the checkpoint only decides what state are restored for these tasks. The only difference with this modification is that these tasks are now might restored from a checkpoints taken after some tasks are finished. Since the perviously finished tasks would always be skipped by not started or run an empty execution, and the behavior of the previously running tasks should keeps unchanged, the overall behavior should be not affected. Best, Yun -- From:Till Rohrmann Send Time:2020 Oct. 13 (Tue.) 17:25 To:Yun Gao Cc:Arvid Heise ; Flink Dev ; User-Flink Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion Yun Gao, I have three comments/questions: 1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though. 2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level? 3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)? Cheers, Till On Tue, Oct 13, 2020 at 9:30 AM Yun Gao wrote: Hi Arvid, Very thanks for the comments! >>> 4) Yes, the interaction is not trivial and also I have not completely >>> thought it through. But in general, I'm currently at the point where I >>> think that we also need non-checkpoint related events in unaligned >>> checkpoints. So just keep that in mind, that we might converge anyhow at >>> this point. I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. >>> In general, what is helping in this c
Re: How to deploy dynamically generated flink jobs?
Hi Alexander, The signature of the createRemoteEnvironment is public static StreamExecutionEnvironment createRemoteEnvironment( String host, int port, String... jarFiles); Which could also ship the jars to execute to remote cluster. Could you have a try to also pass the jar files to the remote environment ? Best, Yun -- Sender:Alexander Bagerman Date:2020/10/29 10:43:16 Recipient: Theme:How to deploy dynamically generated flink jobs? Hi, I am trying to build a functionality to dynamically configure a flink job (Java) in my code based on some additional metadata and submit it to a flink running in a session cluster. Flink version is 1.11.2 The problem I have is how to provide a packed job to the cluster. When I am trying the following code StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort); ... configuring job workflow here... env.execute(jobName); I am getting ClassNotFoundException stating that code for my mapping functions did not make it to the cluster. Which makes sense. What would be the right way to deploy dynamically configured flink jobs which are not packaged as a jar file but rather generated ad-hoc? Thanks
Re: Re: How to deploy dynamically generated flink jobs?
Hi Alexander, From my side I still think it should be reasonable to have a jar that contains the code that are running in the clients and also shipped to the cluster. Then this jar could also be included in the shipping jar list. For the second issue, similarly I think you may first build the project to get the jar containing the code, then fill the path of the generated jar in to test the submitting. Best, Yun --Original Mail -- Sender:Alexander Bagerman Send Date:Thu Oct 29 11:38:45 2020 Recipients:Yun Gao CC:Flink ML Subject:Re: How to deploy dynamically generated flink jobs? I did try it but this option seems to be for a third party jar. In my case I would need to specify/ship a jar that contains the code where job is being constracted. I'm not clear of 1. how to point to the containg jar 2. how to test such a submission from my project running in Eclipse Alex On Wed, Oct 28, 2020 at 8:21 PM Yun Gao wrote: Hi Alexander, The signature of the createRemoteEnvironment is public static StreamExecutionEnvironment createRemoteEnvironment( String host, int port, String... jarFiles); Which could also ship the jars to execute to remote cluster. Could you have a try to also pass the jar files to the remote environment ? Best, Yun -- Sender:Alexander Bagerman Date:2020/10/29 10:43:16 Recipient: Theme:How to deploy dynamically generated flink jobs? Hi, I am trying to build a functionality to dynamically configure a flink job (Java) in my code based on some additional metadata and submit it to a flink running in a session cluster. Flink version is 1.11.2 The problem I have is how to provide a packed job to the cluster. When I am trying the following code StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);... configuring job workflow here...env.execute(jobName); I am getting ClassNotFoundException stating that code for my mapping functions did not make it to the cluster. Which makes sense. What would be the right way to deploy dynamically configured flink jobs which are not packaged as a jar file but rather generated ad-hoc? Thanks
Re: Unify error handler and late window record output for SQL api
Hi Yi, Sorry I'm might not be experts for SQL, as a whole, since SQL should be a high-level API, the users might have less control for the jobs: 1. Unfortunately we do not have the API to catch all the errors. I think even with DataStream, we also do not provide API to catch the runtime exception such as the one related to network. Could you also explain a bit on why this functionality is wanted? 2. SQL API currently also does not provide API to sideout the late records, since the standard SQL does not provide the corresponding grammar, it would be a bit complex to provide the corresponding functionalities. Best, Yun --Original Mail -- Sender:Yi Tang Send Date:Thu Oct 29 15:08:30 2020 Recipients:Flink ML Subject:Unify error handler and late window record output for SQL api Hi, I'm looking for a way to handle potential errors in job submitted with SQL API, but unfortunately nothing found. Handle errors manually in SQL API is hard, I think. Is there a way to handle all errors and send them to a SideOutput to avoid task failure. Also one can put late records into a SideOutput in streaming API, looks like there's no equivalent in SQL API. Thanks.
Re: Native kubernetes setup failed to start job
Hi Liangde, I pull in Yang Wang who is the expert for Flink on K8s. Best, Yun --Original Mail -- Sender:Chen Liangde Send Date:Fri Oct 30 05:30:40 2020 Recipients:Flink ML Subject:Native kubernetes setup failed to start job I created a flink cluster in kubernetes following this guide: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html The job manager was running. When a job was submitted to the job manager, it spawned a task manager pod, but the task manager failed to connect to the job manager. And in the job manager web ui I can't find the task manager. This error is suspicious: org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 352518404 - discarded 2020-10-29 13:22:51,069 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().2020-10-29 13:22:51,176 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with java.io.IOException: Connection reset by peer2020-10-29 13:22:51,176 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 352518404 - discarded2020-10-29 13:22:51,180 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]] Caused by: [The remote system explicitly disassociated (reason unknown).]2020-10-29 13:22:51,183 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.2020-10-29 13:23:01,203 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with java.io.IOException: Connection reset by peer
Re: Re: Re: How to deploy dynamically generated flink jobs?
Hi Alexander, Sorry I might not fully understand the issue, do you means the "flink" jar is the same jar with the spring app fat jar, or they are not the same jar? As a whole, I think the parameter value we need for jarFiles is the absolute path of the jar file. We might need some logic to decide the path to the jar files. For example, if the "flink" jar containing the UDF is the same to the spring app fat jar containing the execute call, we might use method like [1] to find the containing jar, otherwise we might need some mappings from the job name to its flink jar. Best, Yun [1] https://github.com/apache/hadoop/blob/8ee6bc2518bfdf7ad257cc1cf3c73f4208c49fc0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ClassUtil.java#L38 --Original Mail -- Sender:Alexander Bagerman Send Date:Fri Oct 30 04:49:59 2020 Recipients:Yun Gao CC:Flink ML Subject:Re: Re: How to deploy dynamically generated flink jobs? Thanks, Yun. Makes sense. How would you reference a jar file from inside of another jar for such invocation? In my case I would have an interactive application - spring boot web app - where the job would be configured and StreamExecutionEnvironment.execute(jobName) is called. Spring app is a runnable fat jar with my "flink" jar packaged along with other jars. How would I specify location to the jar so that StreamExecutionEnvironment can find it? Thanks, Alex
Re: Duplicate operators generated by plan
Hi Rex, Could you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ? Best, Yun --Original Mail -- Sender:Rex Fenley Send Date:Fri Dec 4 02:55:41 2020 Recipients:user Subject:Duplicate operators generated by plan Hello, I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too. eg. of two separate operators that are equal Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent] Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]) Which are entirely the same datasets being processed. The first one points to GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids]) The second one points to GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids]) And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled. Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US
Re: How to parse list values in csv file
Hi, The CSV only supports the types listed in [1] and must use the types in this list, thus for other types some kind of workaround is needed, like first parsed as string and parsed again later in the program. Best, Yun [1] https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287 --Original Mail -- Sender:narasimha Send Date:Fri Dec 4 00:45:53 2020 Recipients:user Subject:How to parse list values in csv file Hi, Getting below error when trying to read a csv file, one of the field is list tupe Can someone help if fixing the issue jobmanager_1 | Caused by: java.lang.IllegalArgumentException: The type 'java.util.List' is not supported for the CSV input format. jobmanager_1 | at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87) ~[flink-dist_2.11-1.11.2.jar:1.11.2] -- A.Narasimha Swamy
Re: Re: Duplicate operators generated by plan
Hi Rex, I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ? Best, Yun [1] The example code: StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.setRestartStrategy(RestartStrategies.noRestart()); bsEnv.setParallelism(1); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); DataStream> source = bsEnv.addSource(new RichParallelSourceFunction>() { @Override public void run(SourceContext> sourceContext) throws Exception { sourceContext.collect(new Tuple2<>(0, "test")); } @Override public void cancel() { } }); Table table = bsTableEnv.fromDataStream( source, $("id"), $("name")); Table table2 = table.select(call("abs", $("id")), $("name")) .as("new_id", "new_name"); bsTableEnv.createTemporaryView("view", table2); Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id"); Table ret = table.join(handled) .where($("id").isEqual($("new_id"))) .select($("id"), $("name"), $("new_name")); System.out.println(ret.explain()); DataStream> row = bsTableEnv.toRetractStream(ret, Row.class); row.addSink(new SinkFunction>() { @Override public void invoke(Tuple2 value, Context context) throws Exception { } }); System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON()); -- Sender:Rex Fenley Date:2020/12/04 14:18:21 Recipient:Yun Gao Cc:user; Brad Davis Theme:Re: Duplicate operators generated by plan cc Brad On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley wrote: Yes, the same exact input operators go into both joins. The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it. // in the main function val orgUsersTable = splatRoles( this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), OrgUsersRoleSplatPrefix, this.tableEnv ) // helper function def splatRoles( table: Table, columnPrefix: String, tableEnv: TableEnvironment ): Table = { // Flink does not have a contains function so we have to splat out our role array's contents // and join it to the originating table. val func = new SplatRolesFunc() val splatted = table .map(func($"roles", $"id")) .as( "id_splatted", s"${columnPrefix}_is_admin", s"${columnPrefix}_is_teacher", s"${columnPrefix}_is_student", s"${columnPrefix}_is_parent" ) // FIRST_VALUE is only available in SQL - so this is SQL. // Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will // toss it out and all future joins will not have a unique key. tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) val grouped = tableEnv.sqlQuery(s""" SELECT id_splatted, FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent FROM ${columnPrefix}_splatted GROUP BY id_splatted """) return table .join(grouped, $"id" === $"id_splatted") .dropColumns($"id_splatted") .renameColumns($"roles".as(s"${columnPrefix}_roles")) } @FunctionHint( output = new DataTypeHint( "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" ) ) class SplatRolesFunc extends ScalarFunction { def eval(roles: Array[String], id: java.lang.Long): Row = { val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) return Row.of(id, isAdmin, isTeacher, isStudent, isParent) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW( Types.LONG, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN ) } On Thu, Dec 3, 2020 at 7:49 PM Yun Gao wrote: Hi Rex, Could you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also hav
Re: How to parse list values in csv file
Glad to hear that you solved this issue! Best, Yun-- Sender:narasimha Date:2020/12/06 21:35:33 Recipient:Yun Gao Cc:user Theme:Re: How to parse list values in csv file thanks for you email. Translated csv to JSON, read it as a plain text file and then processed to objects. It solved my use case. On Fri, Dec 4, 2020 at 12:24 PM Yun Gao wrote: Hi, The CSV only supports the types listed in [1] and must use the types in this list, thus for other types some kind of workaround is needed, like first parsed as string and parsed again later in the program. Best, Yun [1] https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287 --Original Mail -- Sender:narasimha Send Date:Fri Dec 4 00:45:53 2020 Recipients:user Subject:How to parse list values in csv file Hi, Getting below error when trying to read a csv file, one of the field is list tupe Can someone help if fixing the issue jobmanager_1 | Caused by: java.lang.IllegalArgumentException: The type 'java.util.List' is not supported for the CSV input format. jobmanager_1 | at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83) ~[flink-dist_2.11-1.11.2.jar:1.11.2] jobmanager_1 | at org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87) ~[flink-dist_2.11-1.11.2.jar:1.11.2] -- A.Narasimha Swamy -- A.Narasimha Swamy
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues: 1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2 before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2. 2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions. 3. We would have to add a field isFinished to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format. 4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks. Glad to hear your opinions! Best, Yun [1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Aljoscha, Very thanks for the feedbacks! For the remaining issues: > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes. Yes, exactly, I would like to insert "artificial" barriers for in case we receive EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet. > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible? I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions! > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even more duplicate code? Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern. Best, Yun -- From:Aljoscha Krettek Send Time:2020 Dec. 15 (Tue.) 18:11 To:dev Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: > 1. To include EndOfPartition into consideration for barrier alignment at > the TM side, we now tend to decouple the logic for EndOfPartition with the > normal alignment behaviors to avoid the complex interference (which seems to > be a bit not trackable). We could do so by inserting suitable barriers for > input channels received but not processed EndOfPartition. For example, if a > task with four inputs has received barrier 2 from two input channels, but the > other two inputs do not received barrier 2 before EndOfPartition due to the > precedent tasks are finished, we could then insert barrier 2 for the last two > channels so that we could still finish the checkpoint 2. You mean we would insert "artificial" barriers for barrier 2 in case we receive EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes. > 2. As we have discussed, if a tasks finished during we triggering the > tasks, it would cause checkpoint failure and we should re-trigger its > descendants. But if possible we think we might skip this issue at the first > version to reduce the implementation complexity since it should not affect > the correctness. We could considering support it in the following versions. I think this should be completely fine. > 3. We would have to add a field isFinished to OperatorState so that we > could not re-run finished sources after failover. However, this would require > a new version of checkpoint meta. Currently Flink have an abstract > MetaV2V3SerializerBase and have V2 and V3 extends it to share some > implementation. To add V4 which is only different from V3 for one field, the > current PoC want to introduce a new MetaV3V4SerializerBase extends > MetaV2V3SerializerBase to share implementation between V3 and V4. This might > looks a little complex and we might need a general mechanism to extend > checkpoint meta format. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible? > 4. With the change StreamTask would have two types of subclasses > according to how to implement triggerCheckpoint, one is source tasks that > perform checkpoints immediately and another is the non-source tasks that > would notify CheckpointBarrierHandler in some way. However, since we have > multiple source tasks (legacy and new source) and multiple non-source tasks > (one-input, two-input, multiple-input), it would cause the cases that > multiple subclasses share the same implementation and cause code repetition. > Currently the PoC introduces a new level of abstraction, namely > SourceStreamTasks and NonSourceStreamTasks, but what makes it more > complicated is that StreamingIterationHead extends OneInputStreamTask but it > need to perform checkpoint as source tasks. Don
Re: See lag end-to-end
Hi Rex, I think Latency Marker is what you need [1]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking -- Sender:Rex Fenley Date:2020/12/21 04:57:59 Recipient:user Cc:Brad Davis Theme:See lag end-to-end Hello, Is there some proxy to seeing the relative time it takes for records to make it through an entire job plan? Maybe checkpoint alignment time would be a proxy for this? Is there metrics for that or something else that would provide signal here? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US
Re: checkpointing seems to be throttled.
Hi Edward, For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env then it might better to switch to statebackend like rocksdb. For the checkpoint timeout, AFAIK there should be no large changes after 1.9.2. There may be different issues for checkpoint timeout, and one possible one might be there are back-pressure due to some operator could not process its records in time, which would block the checkpoints. I think you might check the back-pressure [1] first, and if there is indeed back pressure, then you might try unaligned checkpoints or solve the back pressure by increasing the parallelism of slow operators. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html --Original Mail -- Sender:Colletta, Edward Send Date:Mon Dec 21 17:50:15 2020 Recipients:user@flink.apache.org Subject:checkpointing seems to be throttled. Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots is set to true. 13 jobs running. Average parallelism of each job is 4. Flink version 1.11.2, Java 11. Running on AWS EC2 instances with EFS for high-availability.storageDir. We are seeing very high checkpoint times and experiencing timeouts. The checkpoint timeout is the default 10 minutes. This does not seem to be related to EFS limits/throttling . We started experiencing these timeouts after upgrading from Flink 1.9.2/Java 8. Are there any known issues which cause very high checkpoint times? Also I noticed we did not set state.checkpoints.dir, I assume it is using high-availability.storageDir. Is that correct? For now we plan on setting execution.checkpointing.timeout: 60 min execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned trueand also explicitly setstate.checkpoints.dir
Re: [Help Required:]-Unable to submit job from REST API
Hi Puneet, From the doc it seems submitting a job via rest api should send a post request to /jars/:jarid/run [1]. The response "Not Found" should means the REST API server does not know the request type. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#jars-jarid-run-- Sender:Puneet Kinra Date:2020/12/21 19:47:31 Recipient:user Theme:[Help Required:]-Unable to submit job from REST API oHi All Unable to submit job from REST API (Flink-Monitoring API), Steps followed: 1) Load the jar using load api. 2) can see the jar in the /tmp/flink-web folder. 3) Try to run the jar using the following. Request http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json&entryClass=com.orchestrator.flowExecution.GraphExecutor Response: { "errors": [ "Not found." ] } -- Cheers Puneet Kinra Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com e-mail :puneet.ki...@customercentria.com
Re: checkpoint delay consume message
Hi Nick, Are you using EXACTLY_ONCE semantics ? If so the sink would use transactions, and only commit the transaction on checkpoint complete to ensure end-to-end exactly-once. A detailed description could be find in [1] Best, Yun [1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html -- Sender:nick toker Date:2020/12/21 23:52:34 Recipient:user Theme:checkpoint delay consume message Hello, We noticed the following behavior: If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message. The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters. Could you please advise how we can remove/control this delay? we use flink 1.11.2 BR nick
Re: RE: checkpointing seems to be throttled.
Hi Edward, Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code it requires a path parameter and the path would be the state.checkpoint.dir. If via flink-conf.yaml, I tried on 1.12 by setting state.backend: filesystem in config file and enable checkpoint, it indeed threw an exception said org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir' at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41) at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237) at org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67) at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089) at java.util.Optional.map(Optional.java:215) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070) at CheckpointTest.main(CheckpointTest.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ... 11 more For the timeout, if there are no backpressure, I think it might be helpful to see the time decompostion for the checkpoint in the checkpoint history page in WEB UI to see which phase takes too long time. Best, Yun --Original Mail -- Sender:Colletta, Edward Send Date:Tue Dec 22 00:04:03 2020 Recipients:Yun Gao , user@flink.apache.org Subject:RE: checkpointing seems to be throttled. Thanks for the quick response. We are using FsStateBackend, and I did see checkpoint files and directories in the EFS mounted directory. We do monitor backpressure through rest api periodically and we do not see any. From: Yun Gao Sent: Monday, December 21, 2020 10:40 AM To: Colletta, Edward ; user@flink.apache.org Subject: Re: checkpointing seems to be throttled. This email is from an external source -exercise caution regarding links and attachments. Hi Edward, For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory, it could not be recovered after JM failover, which makes it not suitable for production usage. Therefore, if used in production env the
Re: Re: checkpoint delay consume message
Hi nick, Sorry I initially think that the data is also write into Kafka with flink . So it could be ensured that there is no delay in the write side, right ? Does the delay in the read side keeps existing ? Best, Yun --Original Mail -- Sender:nick toker Send Date:Tue Dec 22 01:43:50 2020 Recipients:Yun Gao CC:user Subject:Re: checkpoint delay consume message hi i am confused the delay in in the source when reading message not on the sink nick בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת Yun Gao <yungao...@aliyun.com>: Hi Nick, Are you using EXACTLY_ONCE semantics ? If so the sink would use transactions, and only commit the transaction on checkpoint complete to ensure end-to-end exactly-once. A detailed description could be find in [1] Best, Yun [1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html -- Sender:nick toker Date:2020/12/21 23:52:34 Recipient:user Theme:checkpoint delay consume message Hello, We noticed the following behavior: If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message. The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters. Could you please advise how we can remove/control this delay? we use flink 1.11.2 BR nick
Re: StreamingFileSink closed file exception
Hi Billy, StreamingFileSink does not expect the Encoder to close the stream passed in in encode method. However, ObjectMapper would close it at the end of the write method. Thus I think you think disable the close action for ObjectMapper, or change the encode implementation to objectMapper.writeValue(new CloseShieldOutputStream(stream), element); to avoid the stream get closed actually. --Original Mail -- Sender:Billy Bain Send Date:Thu Dec 24 22:32:06 2020 Recipients:User Subject:StreamingFileSink closed file exception I am new to Flink and am trying to process a file and write it out formatted as JSON. This is a much simplified version. public class AndroidReader { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); DataStreamSource lines = env.readTextFile("file:///path/to/file/input.json"); SingleOutputStreamOperator android = lines.map(new AndroidDataMapper()); StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() ) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build()) .withRollingPolicy(DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); android.addSink(sink); env.execute("Android"); } } @JsonIgnoreProperties(ignoreUnknown = true) public class AndroidData { public AndroidData() { } private String packageName; public String getPackageName() { return packageName; } public void setPackageName(String packageName) { this.packageName = packageName; } } public class AndroidDataMapper implements MapFunction { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public AndroidData map(String value) throws Exception { return objectMapper.readValue(value, AndroidData.class); } } AndroidDataEncoder class: public class AndroidDataEncoder implements Encoder { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public void encode(AndroidData element, OutputStream stream) throws IOException { objectMapper.writeValue(stream, element); } } The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it. java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325) at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) Any help would be appreciated. Thanks! -- Wayne D. Young aka Billy Bob Bain billybobb...@gmail.com
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems: 1. Which operators should wait for one more checkpoint before close ? One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now. b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint. 2. Do we need to solve the case that tasks finished before triggered ? Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options. 3. How to extend a new format for checkpoint meta ? Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either a. Use base classes for each two version. b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version. Best, Yun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks -- From:Yun Gao Send Time:2020 Dec. 16 (Wed.) 11:07 To:Aljoscha Krettek ; dev ; user Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Aljoscha, Very thanks for the feedbacks! For the remaining issues: > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes. Yes, exactly, I would like to insert "artificial" barriers for in case we receive EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet. > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible? I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions! > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even more duplicate code? Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern. Best, Yun -- From:Aljoscha Krettek Send Time:2020 Dec. 15 (Tue.) 18:11 To:dev Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: > 1. To include EndOfPartition into consideration for
Re: Tumbling Time Window
Hi Navneeth For me I think you may start with using the window function and an example for the custom window function could be found in [1]. From the description I think it should be a standard Tumbling window, if implementing with the customized process function, it would end up have a similar functionality with the current window operator. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#processwindowfunction -- Sender:Navneeth Krishnan Date:2021/01/04 08:05:17 Recipient:user Theme:Tumbling Time Window Hello All, First of all Happy New Year!! Thanks for the excellent community support. I have a job which requires a 2 seconds tumbling time window per key, For each user we wait for 2 seconds to collect enough data and proceed to further processing. My question is should I use the regular DSL windowing or write a custom process function which does the windowing. I have heard that the DSL window has more overhead versus the custom window function. What do you guys suggest and can someone provide an example of custom window function per key. Also given the window time is very less (2 secs) would there be more overhead in firing so many timers for each key? Thanks! Regards, Navneeth
Re: Facing issues on kafka while running a job that was built with 1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12
Hi Narasimha, Since the Kafka-connect itself is purely implemented with Java, thus I guess that with high probabililty it is not the issue of scala version. I think may first have a check of the kafka cluster's status ? Best, Yun -- Sender:narasimha Date:2020/12/30 13:43:28 Recipient:user Theme:Facing issues on kafka while running a job that was built with 1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12 Hi, Facing issues on kafka while running a job that was built with 1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12. kafka-connector with 1.11.2-scala-2.11 is getting packaged with the job. Kafka cluster was all good when writing to topics, but when someone reads intermittently the cluster becomes unstable with unresponsive brokers. Do these differences in scala binary versions cause any issues to Kafka? Flink version 1.11.2-scala-2-12 Kafka version - 2.1.0 -- A.Narasimha Swamy
Re: Is chk-$id/_metadata created regardless of enabling externalized checkpoints?
Hi Dongwon, Happy new year! One meta file would be stored on top of HDFS even if external-checkpoint is not enabled. If external checkpoint is not enabled, flink would delete all the checkpoints on exit, and if external checkpoint is enabled, the checkpoints would be kept on cancel or fail cases, according to the settings. Thus for the second issue, I think it would be yes. Best, Yun --Original Mail -- Sender:Dongwon Kim Send Date:Mon Jan 4 19:16:39 2021 Recipients:user Subject:Is chk-$id/_metadata created regardless of enabling externalized checkpoints? Hi, First of all, happy new year! It can be a very basic question but I have something to clarify in my head. my flink-conf.yaml is as follows (note that I didn't specify the value of "execution-checkpointing-externalized-checkpoint-retention [1]"): #... execution.checkpointing.interval: 20min execution.checkpointing.min-pause: 1min state.backend: rocksdb state.backend.incremental: true state.checkpoints.dir: hdfs:///flink-jobs/ckpts state.checkpoints.num-retained: 10 state.savepoints.dir: hdfs:///flink-jobs/svpts #... And the checkpoint configuration is shown as follows in Web UI (note that "Persist Checkpoints Externally" is "Disabled" in the final row): According to [2], externalized checkpoints: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the deployment notes on externalized checkpoints. So I've thought the metadata of a checkpoint is only on JobManager's memory and not stored on HDFS unless "execution-checkpointing-externalized-checkpoint-retention" is set. However, even without setting the value, every checkpoint already contains its own metadata: [user@devflink conf]$ hdfs dfs -ls /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/* Found 1 items -rw-r--r-- 3 user hdfs 163281 2021-01-04 14:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-945/_metadata Found 1 items -rw-r--r-- 3 user hdfs 163281 2021-01-04 14:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-946/_metadata Found 1 items -rw-r--r-- 3 user hdfs 163157 2021-01-04 15:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-947/_metadata Found 1 items -rw-r--r-- 3 user hdfs 156684 2021-01-04 15:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-948/_metadata Found 1 items -rw-r--r-- 3 user hdfs 147280 2021-01-04 15:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-949/_metadata Found 1 items -rw-r--r-- 3 user hdfs 147280 2021-01-04 16:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-950/_metadata Found 1 items -rw-r--r-- 3 user hdfs 162937 2021-01-04 16:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-951/_metadata Found 1 items -rw-r--r-- 3 user hdfs 175089 2021-01-04 16:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-952/_metadata Found 1 items -rw-r--r-- 3 user hdfs 173289 2021-01-04 17:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-953/_metadata Found 1 items -rw-r--r-- 3 user hdfs 153951 2021-01-04 17:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-954/_metadata Found 21 items -rw-r--r-- 3 user hdfs 78748 2021-01-04 14:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/05d76f4e-3d9c-420c-8b87-077fc9880d9a -rw-r--r-- 3 user hdfs 23905 2021-01-04 15:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0b9d9323-9f10-4fc2-8fcc-a9326448b07c -rw-r--r-- 3 user hdfs 81082 2021-01-04 16:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0f6779d0-3a2e-4a94-be9b-d9d6710a7ea0 -rw-r--r-- 3 user hdfs 23905 2021-01-04 16:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/107b3b74-634a-462c-bf40-1d4886117aa9 -rw-r--r-- 3 user hdfs 78748 2021-01-04 14:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/18a538c6-d40e-48c0-a965-d65be407a124 -rw-r--r-- 3 user hdfs 83550 2021-01-04 16:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/24ed9c4a-0b8e-45d4-95b8-64547cb9c541 -rw-r--r-- 3 user hdfs 23905 2021-01-04 17:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/35ee9665-7c1f-4407-beb5-fbb312d84907 -rw-r--r-- 3 user hdfs 47997 2021-01-04 11:25 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/36363172-c401-4d60-a970-cfb2b3cbf058 -rw-r--r-- 3 user hdfs 81082 2021-01-04 15:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/43aecc8c-145f-43ba-81a8-b0ce2c3498f4 -rw-r--r-- 3 user hdfs 79898 2021-01-04 15:05 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/5743f278-fc50-4c4a-b14e-89bfdb2139fa -rw-r--r-- 3 user hdfs 23905 2021-01-04 16:45 /flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/67e16688-c48c-409b-acac-e7091a84d548 -rw-r--r-- 3 user hdfs 23905 2021-01
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Avrid, Very thanks for the feedbacks! For the second issue, sorry I think I might not make it very clear, I'm initially thinking the case that for example for a job with graph A -> B -> C, when we compute which tasks to trigger, A is still running, so we trigger A to start the checkpoint. However, before the triggering message reached A, A gets finished and the trigger message failed due to not found the task. In this case if we do not handle it, the checkpoint would failed due to timeout. However, by default failed checkpoint would cause job failure and we would also need to wait for a checkpoint interval for the next checkpoint. One solution would be check all the pending checkpoints to trigger B instead when JM is notified that A is finished. For the third issue, it should work if we store a special value for some filed in OperatorState or OperatorSubtaskState, for example, we might store a special subtaskState map inside the OperatorState to mark it is finished since the finished operator should always have an empty state. Very thanks for the advices! I'll try with this method. Best, Yun -- From:Arvid Heise Send Time:2021 Jan. 5 (Tue.) 17:16 To:Yun Gao Cc:Aljoscha Krettek ; dev ; user Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, 1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. @Aljoscha Krettek any idea on this one? 2. I'm not sure I get it completely. Let's assume we have a source partition that is finished before the first checkpoint. Then, we would need to store the finished state of the subtask somehow. So I'm assuming, we still need to trigger some checkpointing code on finished subtasks. 3. Do we really want to store the finished flag in OperatorState? I was assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we can store the flag inside managed or raw state without changing the format? On Fri, Dec 25, 2020 at 8:39 AM Yun Gao wrote: Hi all, I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems: 1. Which operators should wait for one more checkpoint before close ? One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now. b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint. 2. Do we need to solve the case that tasks finished before triggered ? Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options. 3. How to extend a new format for checkpoint meta ? Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either a. Use base classes for each two version. b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version. Best, Yun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&
Re: StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues
Hi Mahendra, Sorry for the late reply. I noticed that in your code you implement a bucket assigner that reads to switch to a new bucket every minute, does it related to the current problems met ? Since different buckets would use different directories and files, when switching buckets new files would be created and used. Best, Yun --Original Mail -- Sender:Mahendra Hegde Send Date:Tue Dec 29 20:23:33 2020 Recipients:user@flink.apache.org Subject:StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues Hello, I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3. I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with bulk formats. But when I use this I am facing 2 issues : ‘shouldRollOnEvent’ method is getting called on each record addition but .getsize() always gives one message size instead of current partFile size. Files are getting rolled out at every 1 minute even though my checkpoint is bigger (3 mins), I don’t find any way to override this 1 min default rolling. Any suggestion would be appreciated. Code: val avroOcfFilesink : StreamingFileSink[GenericRecord] = StreamingFileSink.forBulkFormat(new Path(avroOutputPath), new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() { override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = { val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema) val datumWriter = new ReflectDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.setCodec(CodecFactory.snappyCodec) dataFileWriter.create(schema, out) dataFileWriter } })) .withBucketAssigner(new BucketAssigner[GenericRecord, String] { override def getBucketId(in: GenericRecord, context: Context): String = { val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix") val currentProcessingTimeUTC = System.currentTimeMillis() bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC) } override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE } }).withBucketCheckInterval(12) .withRollingPolicy( new CheckpointRollingPolicy[GenericRecord, String] { override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = { log.info("## PartFileState.getSize:"+partFileState.getSize+", Creation"+partFileState.getCreationTime+", Lastupdate:"+partFileState.getLastUpdateTime) false } override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = { val result : Boolean = (currentTime - partFileState.getCreationTime) >= 1 log.info(" currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result) false } } ).build() Thanks MH
Re: Re: Implementing a TarInputFormat based on FileInputFormat
Hi Billy, I checked the provided example and found it should be a problem of ContinuousFileReader, and I created an issue for it[1]. For temporarily go around the issue, I think you may disable the chain of ContinuousFileReaderOperator with the following operators: android.disableChaining().sinkTo(sink); Best, Yun [1] https://issues.apache.org/jira/browse/FLINK-20888 --Original Mail -- Sender:Billy Bain Send Date:Thu Jan 7 04:02:34 2021 Recipients:Arvid Heise CC:user , Billy Bain Subject:Re: Implementing a TarInputFormat based on FileInputFormat Hi Arvid, Thanks for the response. I have created a sample application with input data and uploaded it to google drive. The sample data is in the archive... thus the large size. (27 mb) https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing To run it: flink run -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader /path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path /path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/ The main class: public class AndroidTarReader { public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); String inputPath = parameter.get("input_path"); String outputPath = parameter.get("output_path"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource android = env.readFile(new TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath); final FileSink sink = FileSink .forRowFormat(new Path(outputPath), new AndroidDataEncoder()) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build()) .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(1024 * 1024) .build()) .build(); android.sinkTo(sink); env.execute("zMarket Android"); } } On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise wrote: Hi Billy, the exception is happening on the output side. Input side looks fine. Could you maybe post more information about the sink? On Mon, Dec 28, 2020 at 8:11 PM Billy Bain wrote: I am trying to implement a class that will work similar to AvroFileFormat. This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON. I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325) at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71) at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195) at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202) at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.base/java.lang.Thread.run(Thread.java:836) public class TarInputFormat extends FileInputFormat implements ResultTypeQueryable { private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class);
Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Roman, Very thanks for the feedbacks! I'll try to answer the issues inline: > 1. Option 1 is said to be not preferable because it wastes resources and adds > complexity (new event). > However, the resources would be wasted for a relatively short time until the > job finishes completely. > And compared to other options, complexity seems much lower. Or are > differences in task completion times so huge and so common? There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished part of the job would not be able to be released. And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages, which is similar to the current EndOfPartition, but do not cause the channels to be released. > 2. I think it would be helpful to describe how is rescaling handled in > Options 2 and 3 (or maybe it's not supported for jobs about to finish). For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint. For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed to the 5 new subtasks after failover. If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished, and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints after failover. > 3. Option 3 assumes that the state of a finished task is not used. That's > true for operator state, but what about channel state (captured by unaligned > checkpoint)? > I think it still has to be sent downstream which invalidates this Option. For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the downstream tasks that are still running. In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then we would be able to ensure the finished tasks always have an empty state. I'll also optimize the FLIP to make it more clear~ Best, Yun --Original Mail -- Sender:Khachatryan Roman Send Date:Thu Jan 7 21:55:52 2021 Recipients:Arvid Heise CC:dev , user Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread). 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively short time until the job finishes completely. And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option. Regards, Roman On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise wrote: We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) I think we are mixing two different things here that may require different solutions: 1. Tasks (=sink) that may need to do something with the final checkpoint. 2. Tasks that only f
Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Roman, Very thanks for the feedbacks ! > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). > But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint > by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. > Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? I also agree with that there would indeed be possible cases that the checkpoint get slower since it could not skip the data in the result partition of the finished upstream task: a) For aligned checkpoint, the cases would not happen since the downstream tasks would always need to process the buffers in order. b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. But since only the result partition part of the finished upstream need wait to be processed, the other part of the execution graph could still perform the unaligned checkpoint normally, I think the average delay caused would be much lower than the completely aligned checkpoint, but there would still be extremely bad cases that the delay is long. Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. For postpone the notification the JM notification, since current JM should not be able to know if the task has received all the EndOfPartition from the upstream tasks, we might need to introduce new RPC for notifying the state and since the triggering is not atomic, we may also met with some synchronization issues between JM and TM, which would introduce some complexity. Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. We could only do the wait for the PipelineResultPartition so it won't affect the batch jobs. With the waiting the unaligned checkpoint could continue to trigger the upstream task and skip the buffers in the result partition. Since the result partition state would be kept within the operator state of the last operator, after failover we would found that the last operator has an non-empty state and we would restart the tasks containing this operator to resend the snapshotted buffers. Of course this would also introduce some complexity, and since the probability of long delay would be lower than the completely aligned case, do you think it would be ok for us to view it as an optimization and postpone it to future versions ? Best, Yun -- From:Khachatryan Roman Send Time:2021 Jan. 11 (Mon.) 05:46 To:Yun Gao Cc:Arvid Heise ; dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks a lot for your answers Yun, > In detail, support we have a job with the graph A -> B -> C, support in one > checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if it > received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received > EndOfPartition from the network (namely inputChannel.onBuffer() is called > with > EndOfPartition) and then taking snapshot for the input channels, as the > normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? Regards, Roman
Re: Re: Use Flink to process request with list of queries and aggregate
Hi Li, From my view I think it would not be eaily use a countWindow if you have different number of records for each key (namely user in this case). I think you may need to user the low level KeyedProcessFunction [1] to keep some state by yourself. For example, each request might also carries the total number of requests of each user, and in the KeyedProcessFunction you might record the received number of requests and total requests of this user in the state. Whenever enough requests is received for each user, it could be known that the message is completely processed and the state of this user could also be cleaned at then. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#the-keyedprocessfunction -- Sender:Li Wang Date:2021/01/11 07:10:27 Recipient: Theme:Re: Use Flink to process request with list of queries and aggregate Can I get any suggestion? Thanks a lot. - Li -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the possible max delay. > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure. But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also agree the following option would be a better one that we try to complete each checkpoint. >> Thus another possible option might be let the upstream task to wait till all >> the pending buffers in the result partition has been flushed before get to >> finish. > This is what I meant by "postpone JM notification from source". Just blocking > the task thread wouldn't add much complexity, though I'm not sure if it would > cause any problems. >> do you think it would be ok for us to view it as an optimization and >> postpone it to future versions ? > I think that's a good idea. And also very sorry for here I should wrongly understand the proposals, and currently I also do not see explicit problems for waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun -- From:Khachatryan Roman Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao Cc:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the > downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. > But since only the result partition part of the finished upstream need wait > to be processed, the other part of > the execution graph could still perform the unaligned checkpoint normally Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. > Declining the RPC-trigger checkpoint would indeed simplify the > implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some concerns > in directly decline the checkpoint. Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. > Thus another possible option might be let the upstream task to wait till all > the pending buffers in the result partition has been flushed before get to > finish. This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. > do you think it would be ok for us to view it as an optimization and postpone > it to future versions ? I think that's a good idea. Regards, Roman
Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the FLIP. 2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint mode we could not snapshot the upstream tasks' result partition if it have been finished. One option to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and we would include this in the future versions. I updated this part in this section[3] in the FLIP. 3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators reached its condition. I updated this part in this section[4] in the FLIP. Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed and appreciated. Very thanks! Best, Yun [1] https://cwiki.apache.org/confluence/x/mw-ZCQ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish -- From:Yun Gao Send Time:2021 Jan. 12 (Tue.) 10:30 To:Khachatryan Roman Cc:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the possible max delay. > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure. But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also agree the following option would be a better one that we try to complete each checkpoint. >> Thus another possible option might be let the upstream task to wait till all >> the pending buffers in the result partition has been flushed before get to >> finish. > This is what I meant by "postpone JM notification from source". Just blocking > the task thread wouldn't add much complexity, though I'm not sure if it would > cause any problems. >> do you think it would be ok for us to view it as an optimization and >> postpone it to future versions ? > I think that's a good idea. And also very sorry for here I should wrongly understand the proposals, and currently I also do not see explicit problems for waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun -- From:Khachatryan Roman Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao Cc:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks F
Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
Hi Sagar, I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode. Best, Yun [1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64 [2] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69 [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface --Original Mail -- Sender:Ardhani Narasimha Send Date:Thu Jan 14 15:11:35 2021 Recipients:sagar CC:Flink User Mail List Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12) Interesting use case. Can you please elaborate more on this. On what criteria do you want to batch? Time? Count? Or Size? On Thu, 14 Jan 2021 at 12:15 PM, sagar wrote: Hi Team, I am getting the following error while running DataStream API in with batch mode with kafka source. I am using FlinkKafkaConsumer to consume the data. Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0] In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded I don't find any clear example of how to do it with kafka souce with Flink 1.12 I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case. Also for any large bounded source are there any alternatives to achieve this? -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. --- IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email. ---
Re: StreamingFileSink with ParquetAvroWriters
Hi Jan, Could you have a try by adding this dependency ? org.apache.parquet parquet-avro 1.11.1 Best, Yun --Original Mail -- Sender:Jan Oelschlegel Send Date:Thu Jan 14 00:49:30 2021 Recipients:user@flink.apache.org Subject:StreamingFileSink with ParquetAvroWriters Hi, i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format. As it says in the documentation I have added the dependencies: org.apache.flink flink-parquet_${scala.binary.version} ${flink.version} And this is my file sink definition: val sink: StreamingFileSink[Event] = StreamingFileSink .forBulkFormat( new Path("hdfs://namenode.local:8020/user/datastream/"), ParquetAvroWriters.forReflectRecord(classOf[Event]) ) .build() If I execute this in cluster I get the following error: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Looks like there are some dependencies missing. How can I fix this? Jan O.HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
Hi Sagar, I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient. Best, Yun -- Sender:Yun Gao Date:2021/01/14 15:26:54 Recipient:Ardhani Narasimha; sagar Cc:Flink User Mail List Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12) Hi Sagar, I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode. Best, Yun [1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64 [2] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69 [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface --Original Mail -- Sender:Ardhani Narasimha Send Date:Thu Jan 14 15:11:35 2021 Recipients:sagar CC:Flink User Mail List Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12) Interesting use case. Can you please elaborate more on this. On what criteria do you want to batch? Time? Count? Or Size? On Thu, 14 Jan 2021 at 12:15 PM, sagar wrote: Hi Team, I am getting the following error while running DataStream API in with batch mode with kafka source. I am using FlinkKafkaConsumer to consume the data. Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0] In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded I don't find any clear example of how to do it with kafka souce with Flink 1.12 I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case. Also for any large bounded source are there any alternatives to achieve this? -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. --- IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email. ---
Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd like to post some points we discussed: 1) For the problem that the "new" root task coincidently finished before getting triggered successfully, we have listed two options in the FLIP-147[1], for the first version, now we are not tend to go with the first option that JM would re-compute and re-trigger new sources when it realized some tasks are not triggered successfully. This option would avoid the complexity of adding new PRC and duplicating task states, and in average case it would not cause too much overhead. 2) For how to support operators like Sink Committer to wait for one complete checkpoint before exit, it would be more an issue of how to use the checkpoints after tasks finished instead of how to achieve checkpoint after tasks finished, thus we would like to not include this part first in the current discussion. We would discuss and solve this issue separately after FLIP-147 is done. Best, Yun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished -- From:Yun Gao Send Time:2021 Jan. 13 (Wed.) 16:09 To:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the FLIP. 2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint mode we could not snapshot the upstream tasks' result partition if it have been finished. One option to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and we would include this in the future versions. I updated this part in this section[3] in the FLIP. 3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators reached its condition. I updated this part in this section[4] in the FLIP. Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed and appreciated. Very thanks! Best, Yun [1] https://cwiki.apache.org/confluence/x/mw-ZCQ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish -- From:Yun Gao Send Time:2021 Jan. 12 (Tue.) 10:30 To:Khachatryan Roman Cc:dev ; user Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it coul
Re: [DISCUSS] FLIP-115: Filesystem connector in Table
Hi, Very thanks for Jinsong to bring up this discussion! It should largely improve the usability after enhancing the FileSystem connector in Table. I have the same question with Piotr. From my side, I think it should be better to be able to reuse existing StreamingFileSink. I think We have began enhancing the supported FileFormat (e.g., ORC, Avro...), and reusing StreamFileSink should be able to avoid repeat work in the Table library. Besides, the bucket concept seems also matches the semantics of partition. For the notification of adding partitions, I'm a little wondering that the Watermark mechanism might not be enough since Bucket/Partition might spans multiple subtasks. It depends on the level of notification: if we want to notify for the bucket on each subtask, using watermark to notifying each subtask should be ok, but if we want to notifying for the whole Bucket/Partition, we might need to also do some coordination between subtasks. Best, Yun -- From:Piotr Nowojski Send Time:2020 Mar. 13 (Fri.) 18:03 To:dev Cc:user ; user-zh Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table Hi, Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? Piotrek > On 13 Mar 2020, at 10:04, jinhai wang wrote: > > Thanks for FLIP-115. It is really useful feature for platform developers who > manage hundreds of Flink to Hive jobs in production. > I think we need add 'connector.sink.username' for UserGroupInformation when > data is written to HDFS > > > 在 2020/3/13 下午3:33,“Jingsong Li” 写入: > >Hi everyone, > >I'd like to start a discussion about FLIP-115 Filesystem connector in Table >[1]. >This FLIP will bring: >- Introduce Filesystem table factory in table, support >csv/parquet/orc/json/avro formats. >- Introduce streaming filesystem/hive sink in table > >CC to user mail list, if you have any unmet needs, please feel free to >reply~ > >Look forward to hearing from you. > >[1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >Best, >Jingsong Lee > > >
Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly
Hi Kaan, For the first issue, I think the two implementation should have difference and the first should be slower, but I think which one to use should be depend on your algorithm if it could compute incrementally only with the changed edges. However, as far as I know I think most graph algorithm does not satisfy this property, therefore I think you might have to use the first one. For the second issue, I think you might use Graph.getVertices() and graph.getEdges() to get the underlying vertices and edges dataset of the graph, then you could do any operations with the two datasets, like join the vertices dataset with the second edge list, and finally create a new Graph with new Graph(updated_vertices, edges, env). Best, Yun -- From:Kaan Sancak Send Time:2020 Apr. 16 (Thu.) 17:17 To:Till Rohrmann Cc:Tzu-Li (Gordon) Tai ; user Subject:Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly If the vertex type is POJO what happens during the union of the graph? Is there a persistent approach, or can we define a function handle such occasions? Would there be a performance difference between two cases: 1) Graph graph = … // From edges list graph = graph.runScatterGatherIteration(); Graph secondGraph = … // From second edge list graph = graph.union(secondGraph).runScatterGatherIteration() 2) Graph graph = … // From edges list graph = graph.runScatterGatherIteration(); graph.addEdges(second_edge_list) graph = graph.runScatterGatherIteration(); Before starting the second scatter-gather, I want to set/reset some fields of the vertex value of the vertices that are effected by edge additions/deletions (or union). It would be good to have a callback function that touches the end-points of the edges that are added/deleted. Best Kaan On Apr 15, 2020, at 11:07 AM, Till Rohrmann wrote: Hi Kaan, I think what you are proposing is something like this: Graph graph = ... // get first batch Graph graphAfterFirstSG = graph.runScatterGatherIteration(); Graph secondBatch = ... // get second batch // Adjust the result of SG iteration with secondBatch Graph updatedGraph = graphAfterFirstSG.union/difference(secondBatch)); updatedGraph.runScatterGatherIteration(); Then I believe this should work. Cheers, Till On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak wrote: Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try. I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm. I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations. Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph. Using another batch we added/deleted vertices to the graph we obtained. Now we run another Scatter-Gather on the modified graph. This is no streaming but a naive way to simulate batch updates that are happening concurrently. Do you think it is a feasible way to do this way? Best Kaan On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai wrote: Hi, As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore work primarily on static graphs. I don't think it'll be possible to implement incremental algorithms described in your SO question. Have you tried looking at Stateful Functions, a recent new API added to Flink? It supports arbitrary messaging between functions, which may allow you to build what you have in mind. Take a look at Seth's an Igal's comments here [1], where there seems to be a similar incremental graph-processing use case for sessionization. Cheers, Gordon [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Updating Closure Variables
Hi Senthil, I think you are right that you cannot update closure variables directly and expect them to show up at the workers. If the variable values are read from S3 files, I think currently you will need to define a source explicitly to read the latest value of the file. Whether to use BroadcastedStream should depends on how you want to access the set of string: if you want to broadcast the same strings to all the tasks, then broadcast stream is the solution and if you want to distribute the set of strings in other methods, you could also use more generic connect streams like: streamA.connect(streamB.keyBy()).process(xx). [1] Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#datastream-transformations -- From:Senthil Kumar Send Time:2020 Apr. 27 (Mon.) 21:51 To:user@flink.apache.org Subject:Updating Closure Variables Hello Flink Community! We have a flink streaming application with a particular use case where a closure variable Set is used in a filter function. Currently, the variable is set at startup time. It’s populated from an S3 location, where several files exist (we consume the one with the last updated timestamp). Is it possible to periodically update (say once every 24 hours) this closure variable? My initial research indicates that we cannot update closure variables and expect them to show up at the workers. There seems to be something called BrodcastStream in Flink. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html Is that the right approach? I would like some kind of a confirmation before I go deeper into it. cheers Kumar
回复:Re: Writing _SUCCESS Files (Streaming and Batch)
Hi Peter, Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time related buckets in Table/SQL API, and we are also thinking of supporting "global committing" for arbitrary bucket assigner policy for StreamingFileSink users. The current rough thought is to let users specify when a bucket is terminated on a single task, and the OperatorCoordinator[1] of the sink will aggreate the information from all subtasks about this bucket and do the global committing if the bucket has been finished on all the subtasks, but this is still under thinking and discussion. Any thoughts or requirements on this issue are warmly welcome. Best, Yun [1] OperatorCoordinator is introduced in FLIP-27: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface. This is a component resides in JobManager and could communicate with all the subtasks of the corresponding operator, thus it could be used to aggregate status from subtasks. --原始邮件 -- 发件人:Robert Metzger 发送时间:Tue May 12 15:36:26 2020 收件人:Jingsong Li 抄送:Peter Groesbeck , user 主题:Re: Writing _SUCCESS Files (Streaming and Batch) Hi Peter, I filed a ticket for this feature request: https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your thoughts / requirements to the ticket) Best, Robert On Wed, May 6, 2020 at 3:41 AM Jingsong Li wrote: Hi Peter, The troublesome is how to know the "ending" for a bucket in streaming job. In 1.11, we are trying to implement a watermark-related bucket ending mechanism[1] in Table/SQL. [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck wrote: I am replacing an M/R job with a Streaming job using the StreamingFileSink and there is a requirement to generate an empty _SUCCESS file like the old Hadoop job. I have to implement a similar Batch job to read from backup files in case of outages or downtime. The Batch job question was answered here and appears to be still relevant although if someone could confirm for me that would be great. https://stackoverflow.com/a/39413810 The question of the Streaming job came up back in 2018 here: http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E But the solution to use or extend the BucketingSink class seems out of date now that BucketingSink has been deprecated. Is there a way to implement a similar solution for StreamingFileSink? I'm currently on 1.8.1 although I hope to update to 1.10 in the near future. Thank you, Peter -- Best, Jingsong Lee
回复:changing the output files names in Streamfilesink from part-00 to something else
Hi Dhurandar: Currently StreamingFileSink should be able to change the prefix and suffix of the filename[1], it could be changed to something like -0-0. Could this solve your problem ? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#part-file-configuration -- 发件人:dhurandar S 日 期:2020年05月13日 05:13:04 收件人:user; 主 题:changing the output files names in Streamfilesink from part-00 to something else We want to change the name of the file being generated as the output of our StreamFileSink. , when files are generated they are named part-00*, is there a way that we can change the name. In Hadoop, we can change RecordWriters and MultipleOutputs. May I please some help in this regard. This is causing blockers for us and will force us t move to MR job -- Thank you and regards, Dhurandar
回复:Performance issue when writing to HDFS
Hi Kong, Sorry that I'm not expert of Hadoop, but from the logs and Google, It seems more likely to be a problem of HDFS side [1] ? Like long-time GC in DataNode. Also I have found a similar issue from the history mails [2], and the conclusion should be similar. Best, Yun [1] https://community.cloudera.com/t5/Support-Questions/Solution-for-quot-slow-readprocessor-quot-warnings/td-p/122046 [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html --原始邮件 -- 发件人:Mu Kong 发送时间:Fri May 22 11:16:32 2020 收件人:user 主题:Performance issue when writing to HDFS Hi all, I have Flink application consuming from Kafka and writing the data to HDFS bucketed by event time with BucketingSink. Sometimes, the the traffic gets high and from the prometheus metrics, it shows the writing is not stable. (getting from flink_taskmanager_job_task_operator_numRecordsOutPerSecond) The output data on HDFS is also getting delayed. (The records for a certain hour bucket are written to HDFS 50 minutes after that hour) I looked into the log, and find warning regarding the datanode ack, which might be related: DFSClient exception:2020-05-21 10:43:10,432 INFO org.apache.hadoop.hdfs.DFSClient - Exception in createBlockOutputStream java.io.IOException: Got error, status message , ack with firstBadLink as :1004 at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1478) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1380) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:558) Slow ReadProcessor read fields warning:2020-05-21 10:42:30,509 WARN org.apache.hadoop.hdfs.DFSClient - Slow ReadProcessor read fields took 30230ms (threshold=3ms); ack: seqno: 126 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 372753456 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[:1004,DS-833b175e-9848-453d-a222-abf5c05d643e,DISK], DatanodeInfoWithStorage[:1004,DS-f998208a-df7b-4c63-9dde-26453ba69559,DISK], DatanodeInfoWithStorage[:1004,DS-4baa6ba6-3951-46f7-a843-62a13e3a62f7,DISK]] We haven't done any tuning for the Flink job regarding writing to HDFS. Is there any config or optimization we can try to avoid delay and these warnings? Thanks in advance!! Best regards, Mu
回复:onTimer method in CoProcessFunction in flink
Hi Jaswin, If I understand right, I think you could add the logic in the onTimer callback. In this callback, OnTimerContext.output(xx, outputTag) could be used to output data to the specific sideout. Besides, you should need a new state to store the elements to output in the onTimer callback. A similar example might be [1]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example --原始邮件 -- 发件人:Jaswin Shah 发送时间:Fri May 22 23:00:43 2020 收件人:user@flink.apache.org , Arvid Heise , Yun Tang 主题:onTimer method in CoProcessFunction in flink How can I identify the type of element for which onTime is called in flink? I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far /** * CoProcessFuntion to process cart and pg messages connected using connect operator. * @author jaswin.shah * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$ */ public class CartPGCoprocessFunction extends KeyedCoProcessFunction { private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for cart messages, orderId+mid is key and cartMessage is value. */ private static MapState cartState = null; /** * Map state for pg messages, orderId+mid is key and pgMessage is value. */ private static MapState pgState = null; /** * Intializations for cart and pg mapStates * * @param config */ @Override public void open(Configuration config) { MapStateDescriptor cartStateDescriptor = new MapStateDescriptor<> ( Constants.CART_DATA, TypeInformation.of(String.class), TypeInformation.of(CartMessage.class) ); cartState = getRuntimeContext().getMapState(cartStateDescriptor); MapStateDescriptor pgStateDescriptor = new MapStateDescriptor<>( Constants.PG_DATA, TypeInformation.of(String.class), TypeInformation.of(PaymentNotifyRequestWrapper.class) ); pgState = getRuntimeContext().getMapState(pgStateDescriptor); } /** * * @return */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { } /** * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. * @param cartMessage * @param context * @param collector * @throws Exception */ @Override public void processElement1(CartMessage cartMessage, Context context, Collector collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+360); String searchKey = cartMessage.createJoinStringCondition(); PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey); if(Objects.nonNull(paymentNotifyObject)) { generateResultMessage(cartMessage,paymentNotifyObject,collector); pgState.remove(searchKey); } else { cartState.put(searchKey,cartMessage); } } /** * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. * @param pgMessage * @param context * @param collector * @throws Exception */ @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+360); String searchKey = pgMessage.createJoinStringCondition(); CartMessage cartMessage = cartState.get(searchKey); if(Objects.nonNull(cartMessage)) { generateResultMessage(cartMessage,pgMessage,collector); cartState.remove(searchKey); } else { pgState.put(searchKey,pgMessage); } } /** * Create ResultMessage from cart and pg messages. * * @param cartMessage * @param pgMessage * @return */ private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector collector) { ResultMessage resultMessage = new ResultMessage(); Payment payment = null; //Logic should be in cart: check for (
回复:Re: onTimer method in CoProcessFunction in flink
Hi Jaswin, I think the event time timer and process time timer in Flink should be fully decoupled: the event time timer is trigger by the watermark received, and the processing time is trigger by physical clock, and you may think them as two seperated timelines and have no guarantee on their relative speed. Therefore, I think the result of computing the deadline with event time and register it as processing time should be nondetermined, and it depends on the gap between event time and processing time. Best, Yun --原始邮件 -- 发件人:Jaswin Shah 发送时间:Sat May 23 22:08:57 2020 收件人:user@flink.apache.org , Arvid Heise , Yun Tang 主题:Re: onTimer method in CoProcessFunction in flink Hi Yun, Actually this problem is solved now. I have been stuck in other problem of timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime registrations was somehow failing, might be it was needing some special handling. I need to know if this callback registration is wrong or is there something wrong. Do we need some special handling for event time semantecs usages? Thanks, Jaswin From: Jaswin Shah Sent: 22 May 2020 20:30 To: user@flink.apache.org ; Arvid Heise ; Yun Tang Subject: onTimer method in CoProcessFunction in flink How can I identify the type of element for which onTime is called in flink? I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far /** * CoProcessFuntion to process cart and pg messages connected using connect operator. * @author jaswin.shah * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$ */ public class CartPGCoprocessFunction extends KeyedCoProcessFunction { private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for cart messages, orderId+mid is key and cartMessage is value. */ private static MapState cartState = null; /** * Map state for pg messages, orderId+mid is key and pgMessage is value. */ private static MapState pgState = null; /** * Intializations for cart and pg mapStates * * @param config */ @Override public void open(Configuration config) { MapStateDescriptor cartStateDescriptor = new MapStateDescriptor<> ( Constants.CART_DATA, TypeInformation.of(String.class), TypeInformation.of(CartMessage.class) ); cartState = getRuntimeContext().getMapState(cartStateDescriptor); MapStateDescriptor pgStateDescriptor = new MapStateDescriptor<>( Constants.PG_DATA, TypeInformation.of(String.class), TypeInformation.of(PaymentNotifyRequestWrapper.class) ); pgState = getRuntimeContext().getMapState(pgStateDescriptor); } /** * * @return */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { } /** * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. * @param cartMessage * @param context * @param collector * @throws Exception */ @Override public void processElement1(CartMessage cartMessage, Context context, Collector collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+360); String searchKey = cartMessage.createJoinStringCondition(); PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey); if(Objects.nonNull(paymentNotifyObject)) { generateResultMessage(cartMessage,paymentNotifyObject,collector); pgState.remove(searchKey); } else { cartState.put(searchKey,cartMessage); } } /** * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. * @param pgMessage * @param context * @param collector * @throws Exception */ @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+360); String searchKey = pgMessage.createJoinStringCondition(); CartMessage cartMessage = cartState.get(searchKey); if(Objects.nonNu
Re: Re: Flink Window with multiple trigger condition
Hi, First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard to implement an event-driven window assigner that start to assign elements to a window after received some elements. What comes to me is that a possible alternative method is to use the low-level KeyedProcessFunction directly: you may register a timer 30 mins later when received the "search" event and write the time of search event into the state. Then for the following events, they will be saved to the state since the flag is set. After received the "start" event or the timer is triggered, you could load all the events from the states, do the aggregation and cancel the timer if it is triggered by "start" event. A simpler case is [1] and it does not consider stop the aggreation when received special event, but it seems that the logic could be added to the case. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example Best, Yun --Original Mail -- Sender:aj Send Date:Sun May 24 01:10:55 2020 Recipients:Tzu-Li (Gordon) Tai CC:user Subject:Re: Flink Window with multiple trigger condition I am still not able to get much after reading the stuff. Please help with some basic code to start to build this window and trigger. Another option I am thinking is I just use a Richflatmap function and use the keyed state to build this logic. Is that the correct approach? On Fri, May 22, 2020 at 4:52 PM aj wrote: I was also thinking to have a processing time window but that will not work for me. I want to start the window when the user "search" event arrives. So for each user window will start from the search event. The Tumbling window has fixed start end time so that will not be suitable in my case. On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai wrote: Hi, To achieve what you have in mind, I think what you have to do is to use a processing time window of 30 mins, and have a custom trigger that matches the "start" event in the `onElement` method and return TriggerResult.FIRE_AND_PURGE. That way, the window fires either when the processing time has passed, or the start event was recieved. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07
Re: Re: Re: Flink Window with multiple trigger condition
yless")) { currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1); SearchSummaryCalculation(record, currentTuple); } sessionSummary.put(search_hex9, currentTuple); } timeState.update(currentTimeState); } On Sun, May 24, 2020 at 10:57 PM Yun Gao wrote: Hi, First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard to implement an event-driven window assigner that start to assign elements to a window after received some elements. What comes to me is that a possible alternative method is to use the low-level KeyedProcessFunction directly: you may register a timer 30 mins later when received the "search" event and write the time of search event into the state. Then for the following events, they will be saved to the state since the flag is set. After received the "start" event or the timer is triggered, you could load all the events from the states, do the aggregation and cancel the timer if it is triggered by "start" event. A simpler case is [1] and it does not consider stop the aggreation when received special event, but it seems that the logic could be added to the case. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example Best, Yun --Original Mail -- Sender:aj Send Date:Sun May 24 01:10:55 2020 Recipients:Tzu-Li (Gordon) Tai CC:user Subject:Re: Flink Window with multiple trigger condition I am still not able to get much after reading the stuff. Please help with some basic code to start to build this window and trigger. Another option I am thinking is I just use a Richflatmap function and use the keyed state to build this logic. Is that the correct approach? On Fri, May 22, 2020 at 4:52 PM aj wrote: I was also thinking to have a processing time window but that will not work for me. I want to start the window when the user "search" event arrives. So for each user window will start from the search event. The Tumbling window has fixed start end time so that will not be suitable in my case. On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai wrote: Hi, To achieve what you have in mind, I think what you have to do is to use a processing time window of 30 mins, and have a custom trigger that matches the "start" event in the `onElement` method and return TriggerResult.FIRE_AND_PURGE. That way, the window fires either when the processing time has passed, or the start event was recieved. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07
Re: Question on stream joins
Hi Sudan, As far as I know, both join and cogroup requires keys (namely partitioning), thus for the non-keyed scenario, you may have to use low-level connect operator to achieve it. In my opinion it should be something like leftSource.connect(rightSource) .process(new TagCoprocessFunction()) // In this function, tag the left source with "0" and the right source with "1" .window(xx) .process(new XX()) // In this function, you could get all the left and right elements in this window, and you could distinguish them with the tag added in the previous step. It should be pointed out that without key (partitioning) the paralellism of the window operator will have to be 1. For the keyed scenarios, You may use high-level operators join/cogroup to achieve that. The join could be seen as a special example as cogroup that in cogroup, you could access all the left and right elements directly, and in join function, the framework will iterate the elements for you and you can only specify the logic for each (left, right) pair. Best, Yun --Original Mail -- Sender:Sudan S Send Date:Fri May 29 01:40:59 2020 Recipients:User-Flink Subject:Question on stream joins Hi , I have two usecases 1. I have two streams which `leftSource` and `rightSource` which i want to join without partitioning over a window and find the difference of count of elements of leftSource and rightSource and emit the result of difference. Which is the appropriate join function ican use ? join/cogroup/connect. 2. I want to replicate the same behaviour over a keyed source. Basically leftSource and rightSource are joined by a partition key. Plz let me know which is the appropriate join operator for the usecase "The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business."
Re: Re: Question on stream joins
Hi Sudan, The first process is used to tag the elements from the left and right windows, so next they could be merged into the same stream and then they could be assigned to the same window. Then the next window(xxx).process(new WindowProcessFunction) defines the window operator to process the windowed elements, thus the second process defines the window process logic. Without the tagging we may not be able to assign the elements from both the left and right stream to the same window. Best, Yun --Original Mail -- Sender:Sudan S Send Date:Fri May 29 14:39:31 2020 Recipients:Yun Gao CC:User-Flink Subject:Re: Question on stream joins Thanks Yun. Was thinking a similar way. I had one more question. leftSource.connect(rightSource) .process(new TagCoprocessFunction()) // In this function, tag the left source with "0" and the right source with "1" .window(xx) .process(new XX()) In this when will the window be applied ? since the window operator is after process(new TagCoprocessFunction()). On Fri, May 29, 2020 at 11:35 AM Yun Gao wrote: Hi Sudan, As far as I know, both join and cogroup requires keys (namely partitioning), thus for the non-keyed scenario, you may have to use low-level connect operator to achieve it. In my opinion it should be something like leftSource.connect(rightSource) .process(new TagCoprocessFunction()) // In this function, tag the left source with "0" and the right source with "1" .window(xx) .process(new XX()) // In this function, you could get all the left and right elements in this window, and you could distinguish them with the tag added in the previous step. It should be pointed out that without key (partitioning) the paralellism of the window operator will have to be 1. For the keyed scenarios, You may use high-level operators join/cogroup to achieve that. The join could be seen as a special example as cogroup that in cogroup, you could access all the left and right elements directly, and in join function, the framework will iterate the elements for you and you can only specify the logic for each (left, right) pair. Best, Yun --Original Mail -- Sender:Sudan S Send Date:Fri May 29 01:40:59 2020 Recipients:User-Flink Subject:Question on stream joins Hi , I have two usecases 1. I have two streams which `leftSource` and `rightSource` which i want to join without partitioning over a window and find the difference of count of elements of leftSource and rightSource and emit the result of difference. Which is the appropriate join function ican use ? join/cogroup/connect. 2. I want to replicate the same behaviour over a keyed source. Basically leftSource and rightSource are joined by a partition key. Plz let me know which is the appropriate join operator for the usecase "The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business." "The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business."
Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?
Hi Yu, I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions. If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received. Best, Yun --Original Mail -- Sender:Yu Yang Send Date:Mon Jun 1 06:37:35 2020 Recipients:user Subject:best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer? Hi all, To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records. One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this? The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record? public class CustomTBaseSerializer extends TBaseSerializer { private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class); @Override public void write(Kryo kryo, Output output, TBase tBase) { try { super.write(kryo, output, tBase); } catch (Throwable t) { LOG.error("Failed to write due to unexpected Throwable", t); } } @Override public TBase read(Kryo kryo, Input input, Class tBaseClass) { try { return super.read(kryo, input, tBaseClass); } catch (Throwable t) { LOG.error("Failed to read from input due to unexpected Throwable", t); return null; } } } Thank you! Regards, -Yu
Re: Reading files from multiple subdirectories
Hi Lorenzo, Read from a previouse thread [1] and the source code, I think you may set inputFormat.setNestedFileEnumeration(true) to also scan the nested files. Best, Yun [1] https://lists.apache.org/thread.html/86a23b4c44d92c3adeb9ff4a708365fe4099796fb32deb6319e0e17f%40%3Cuser.flink.apache.org%3E -- Sender:Lorenzo Nicora Date:2020/06/11 21:31:20 Recipient:user Theme:Reading files from multiple subdirectories Hi, related to the same case I am discussing in another thread, but not related to AVRO this time :) I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3 bucket. Files are bucketed by date time as it often happens. Is there any way, using Flink only, to monitor a base-path and detect new files in any subdirectories? Or I need to use something external to move new files in a single directory? I am currently using env.readFile(inputFormat, path, PROCESS_CONTINUOUSLY, 6) with AvroInputFormat, but it seems it can only monitor a single directory Cheers Lorenzo
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi Felipe, I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. Could you also share your current executing code and the full stacktrace of the exception ? Best, Yun [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java --Original Mail -- Sender:Felipe Gutierrez Send Date:Fri Jun 12 23:11:28 2020 Recipients:user Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example? Hi, I am using the flink training exercise TaxiRide [1] to execute a stream count of events. On the cluster and on my local machine I am receiving the message that joda.Time cannot be serialized "class org.joda.time.LocalDateTime is not a valid POJO type". However it is starting the job on the cluster, but not in my local machine. So I searched in the internet and it is requested to register the jodaTime class on the environment[2]. I did like this: env.getConfig().registerTypeWithKryoSerializer(DateTime.class, AvroKryoSerializerUtils.JodaDateTimeSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); and I added the joda and avro dependency on the pom.xml: joda-time joda-time org.apache.flink flink-avro ${project.version} I also tested using addDefaultKryoSerializer but I got the same error. For some reason, it is still not working. Does anyone have some hint of what could be happening? Thanks! Felipe [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
Re: Shared state between two process functions
Hi Jaswin, Currently the state belongs to single operators, thus it should be not possible to share states between different operators. Could you also share the original problem want to solve by sharing states ? Best, Yun --Original Mail -- Sender:Jaswin Shah Send Date:Sun Jun 14 18:57:54 2020 Recipients:user@flink.apache.org Subject:Shared state between two process functions Hi, Is it possible to create the shared state(MapState) between two different keyedProcessFunction? If it's possible, how can we do that in flink? Thanks, Jaswin
Re: adding s3 object metadata while using StreamFileSink
Hi Dhurandar, With my understand I think what you need is to get notified when a file is written successfully (committed) on the S3 FileSystem. However, currently there is no public API for the listener and there an issue tracking it [1]. With the current version, one possible method comes to me is that may have to use reflection to access some internal states of StreamFileSink to get the committed files. As a whole, you may need to implement a customized StreamingFileSink and override the notifyCheckpointComplete method, where the new S3 file get committed and visible: class CustomizedStreamingFileSink extends StreamingFileSink { public void notifyCheckpointComplete(long checkpointId) throws Exception { // 1. First use reflection to get the list of files will be committed in this call. // The list of files should be get via StreamingFile -> ( StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there will be multiple Buckets) -> (for each Bucket) pendingFileRecoverablesPerCheckpoint // Then we could get the iterator of pending files to commit in this time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2] // Then you could get the S3 object names via (PendingFileRecover if 1.11 is used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName. super.notifyCheckpointComplete(checkpointId); // Get files committed normally. // 3. Then here could start writing meta info for S3 objects recorded in step 1. } } For a single file it may get committed multiple times, therefore the writing meta info action must also be able to handle the repeat writing. Another possible method will be to use a seperate source operator to periodly scans the S3 file system to detect the newly added files and modify their meta data. There should be embedding source function ContinuousFileMonitoringFunction[3] for this work, and I think it might be modified or reused for scanning the files. Best, Yun [1] https://issues.apache.org/jira/browse/FLINK-17900 [2] https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268 [3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java -- Sender:dhurandar S Date:2020/06/20 03:19:38 Recipient:user; Flink Dev Theme:adding s3 object metadata while using StreamFileSink We are creating files in S3 and we want to update the S3 object metadata with some security-related information for governance purposes. Right now Apache Flink totally abstracts how and when S3 object gets created in the system. Is there a way that we can pass the S3 object metadata and update it for the object created. If not, How can we know when Apache Flink has created an S3 file. Deterministically. Since once its created in S3 we can write Java code after that to add those metadata information? -- Thank you and regards, Dhurandar
Re: Problems with type erasure
Hi Vincenzo: Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Flink tries to create a new transformation based on the information of `delays`. Best, Yun --Original Mail -- Sender:Vincenzo Pronestì Send Date:Mon Jun 22 19:02:05 2020 Recipients:flink-user Subject:Problems with type erasure Hi there, I need to execute the following code: 72: KeyedStream, String> keyedDelays = delays 73: .flatMap(new Query1FlatMap()) 74: .keyBy(item -> item.f0);but I keep getting this error message:The program finished with the following exception:The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've read this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html (there's an example with Tuple2> which is the same I need) and I think I have two options:1 - Implement ResultTypeQueryable> in my Query1FlatMap class. I did this by adding:@Overridepublic TypeInformation> getProducedType() {return TypeInformation.of(new TypeHint>(){});}2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:TypeInformation> tInfo = TypeInformation.of(new TypeHint>(){});KeyedStream, String> keyedDelays = delays.flatMap(new Query1FlatMap()).returns(tInfo).keyBy(item -> item.f0);Actually I've also tried with:TypeHint> tHint = new TypeHint>(){};KeyedStream, String> keyedDelays = delays.flatMap(new Query1FlatMap()).returns(tHint).keyBy(item -> item.f0);The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. Tuple2, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double Tuple2?Thank you for your attention.Best regards,Vincenzo
Re: TypeInformation not found
Hi yu, Have you add "import org.apache.flink.api.scala._"? It seems should be ok if the import has been added in the program: import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object Test { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val solutionInput = env.fromElements((1, "1")) solutionInput.print() env.execute() } } Best, Yun --Original Mail -- Sender:王宇 Send Date:Tue Jun 23 09:42:47 2020 Recipients:User Subject:No Subject Hi, all some error occurred when I run flink in minicluster, flink-version:1.11、scala-version:2.12.0. Error:(33, 41) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val solutionInput = env.fromElements((1, "1")) Error:(33, 41) not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)])org.apache.flink.api.scala.DataSet[(Int, String)]. Unspecified value parameter evidence$15. val solutionInput = env.fromElements((1, "1")) Error:(34, 40) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val worksetInput = env.fromElements((2, "2")) Error:(34, 40) not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)])org.apache.flink.api.scala.DataSet[(Int, String)]. Unspecified value parameter evidence$15. val worksetInput = env.fromElements((2, "2")) Error:(47, 41) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val solutionInput = env.fromElements((1, "1")) have tried https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api thanks
Re: Re: TypeInformation not found
Hi Yu, I tried WordCount and the attached test, it should be able to run normally in my IDEA. Could you have a check of the imported project, or reimport the project if there are still problems ? Best, Yun --Original Mail -- Sender:Yu Wang Send Date:Tue Jun 23 15:51:27 2020 Recipients:Yun Gao CC:User Subject:Re: TypeInformation not found thanks Yun Gao!have added "import org.apache.flink.api.scala._", I just to run wordcount in idea . On Tue, Jun 23, 2020 at 11:16 AM Yun Gao wrote: Hi yu, Have you add "import org.apache.flink.api.scala._"? It seems should be ok if the import has been added in the program: import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object Test { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val solutionInput = env.fromElements((1, "1")) solutionInput.print() env.execute() } } Best, Yun --Original Mail -- Sender:王宇 Send Date:Tue Jun 23 09:42:47 2020 Recipients:User Subject:No Subject Hi, all some error occurred when I run flink in minicluster, flink-version:1.11、scala-version:2.12.0. Error:(33, 41) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val solutionInput = env.fromElements((1, "1")) Error:(33, 41) not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)])org.apache.flink.api.scala.DataSet[(Int, String)]. Unspecified value parameter evidence$15. val solutionInput = env.fromElements((1, "1")) Error:(34, 40) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val worksetInput = env.fromElements((2, "2")) Error:(34, 40) not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)])org.apache.flink.api.scala.DataSet[(Int, String)]. Unspecified value parameter evidence$15. val worksetInput = env.fromElements((2, "2")) Error:(47, 41) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val solutionInput = env.fromElements((1, "1")) have tried https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api thanks
Re: ElasticSearch_Sink
Hi Dinesh, As far as I know, to implement the 2 phase commit protocol for one external system, I think the external system is required to provide some kind of transactions that could stay across sessions. With such a transaction mechansim then we could first start a new transaction and write the data, then we precommit the transaction when checkpointing and commit the transaction when on checkpoint complete notificatoin. After failover, we could be able to recover the transaction and abort them (if not precommitted) or commit them (if precommitted) again. As an example, for JDBC we may have to use XA transaction instead of normal JDBC transaction, since JDBC transaction will always be aborted when failover, even if we have precommitted. If such a transaction mechanism is not provided by the external system, we may have to use a secondary system (Like WAL logs or JDBC Table) to first cache the data and only write the data to the final system on commit. Note that since a transaction might be committed multiple times, the final system could still need to deduplicate the records or have some kind of transaction mechansim always aborted on failover. Best, Yun -- Sender:C DINESH Date:2020/07/16 11:01:02 Recipient:user Theme:ElasticSearch_Sink Hello All, Can we implement 2 Phase Commit Protocol for elastic search sink. Will there be any limitations? Thanks in advance. Warm regards, Dinesh.
Re: hybrid state backends
Hi Marco, Sorry that current statebackend is a global configuration and could not be configured differently for different operators. One possible alternative option to this requirements might be set rocksdb as the default statebackend, and for those operators that want to put state in memory, a new operator might be implemented by extends AbstractStreamOperator and rewrite the snapshotState() method, and use raw state to snapshot the in-memory data. However, this option would touch some non-user-level api of flink. Best, Yun-- Sender:Marco Villalobos Date:2021/02/05 19:09:37 Recipient:user Theme:hybrid state backends Is it possible to use different statebackends for different operators? There are certain situations where I want the state to reside completely in memory, and other situations where I want it stored in rocksdb.
Re: UUID in part files
Hi Dan The SQL add the uuid by default is for the case that users want execute multiple bounded sql and append to the same directory (hive table), thus a uuid is attached to avoid overriding the previous output. The datastream could be viewed as providing the low-level api and thus it does not add the uuid automatically. And as you have pointed out, by using OutputFileConfig users could also implement the functionality. Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Mon Feb 8 07:40:36 2021 Recipients:user Subject:UUID in part files Hi. Context I'm migrating my Flink SQL job to DataStream. When switching to StreamingFileSink, I noticed that the part files now do not have a uuid in them. "part-0-0" vs "part-{uuid string}-0-0". This is easy to add with OutputFileConfig. Question Is there a reason why the base OutputFileConfig doesn't add the uuid automatically? Is this just a legacy issue? Or do most people not have the uuid in the file outputs?
Re: Re: flink kryo exception
Hi yidan, One more thing to confirm: are you create the savepoint and stop the job all together with bin/flink cancel -s [:targetDirectory] :jobId command ? Best, Yun --Original Mail -- Sender:赵一旦 Send Date:Sun Feb 7 16:13:57 2021 Recipients:Till Rohrmann CC:Robert Metzger , user Subject:Re: flink kryo exception It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream. In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark. Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it. 赵一旦 于2021年2月7日周日 下午4:05写道: The first problem is critical, since the savepoint do not work. The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works. The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint. So I do not need to restart the job anymore, since what it does automatically is what I want to do. I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem. Till Rohrmann 于2021年2月6日周六 上午12:13写道: Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help. Cheers, Till On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 wrote: Yeah, and if it is different, why my job runs normally. The problem only occurres when I stop it. Robert Metzger 于2021年2月5日周五 下午7:08写道: Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different) On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 wrote: Flink1.12.0; only using aligned checkpoint; Standalone Cluster; Robert Metzger 于2021年2月5日周五 下午6:52写道: Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC) Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API)) Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.) On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 wrote: I do not think this is some code related problem anymore, maybe it is some bug? 赵一旦 于2021年2月5日周五 下午4:30写道: Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared. Based on the new solution, I also tried to stop and restore job with savepoint (all successful). But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below: 2021-02-05 16:21:26 java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321) at org.apache.flink.types.StringValue.readString(StringValue.java:783) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:1
Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi Jan, From my view, I think in Flink Window should be as a "high-level" operation for some kind of aggregation operation and if it could not satisfy the requirements, we could at least turn to using the "low-level" api by using KeyedProcessFunction[1]. In this case, we could use a ValueState to store the current value for each key, and increment the value on each element. Then we could also register time for each key on receiving the first element for this key, and in the onTimer callback, we could send the current state value, update the value to 0 and register another timer for this key after 30s. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction --Original Mail -- Sender:Jan Brusch Send Date:Sat Feb 6 23:44:00 2021 Recipients:user Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem Hi, I was recently working on a problem where we wanted to implement a simple count on a sliding window, e.g. "how many messages of a certain type were emitted by a certain type of sensor in the last n minutes". Which sounds simple enough in theory: messageStream .keyBy(//EmitterType + MessageType) .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), Time.seconds(30))) .map(_ => 1) .reduce((x,y) => x + y) .addSink(...) But there is a tricky edge case: The downstream systems will never know when the count for a certain key goes back to 0, which is important for our use case. The technical reason being that flink doesn't open a window if there are no entries, i.e. a window with count 0 doesn't exist in flink. We came up with the following solution for the time being: messageStream .keyBy(//EmitterType + MessageType) .window(GlobalWindows.create()) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) .evictor(// CustomEvictor: Evict all messages older than n minutes BEFORE processing the window) .process(// CustomCounter: Count all Messages in Window State); .addSink(...) In the case of zero messages in the last n minutes, all messages will be evicted from the window and the process-function will get triggered one last time on the now empty window, so we can produce a count of 0. I have two problems, though, with this solution: 1) It is computationally inefficient for a simple count, as custom process functions will always keep all messages in state. And, on every trigger all elements will have to be touched twice: To compare the timestamp and to count. 2) It does seem like a very roundabout solution to a simple problem. So, I was wondering if there was a more efficient or "flink-like" approach to this. Sorry for the long writeup, but I would love to hear your takes. Best regards Jan -- neuland – Büro für Informatik GmbH Konsul-Smidt-Str. 8g, 28217 Bremen Telefon (0421) 380107 57 Fax (0421) 380107 99 https://www.neuland-bfi.de https://twitter.com/neuland https://facebook.com/neulandbfi https://xing.com/company/neulandbfi Geschäftsführer: Thomas Gebauer, Jan Zander Registergericht: Amtsgericht Bremen, HRB 23395 HB USt-ID. DE 246585501
Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Hi, Have you also include the kakfa-connector related jar in the classpath? Best, Yun --Original Mail -- Sender:joris.vanagtmaal Send Date:Tue Feb 9 03:16:52 2021 Recipients:User-Flink Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector Traceback (most recent call last): File "streaming-dms.py", line 309, in anomalies() File "streaming-dms.py", line 142, in anomalies t_env.sql_query(query).insert_into("ark_sink") File "/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 748, in sql_query j_table = self._j_tenv.sqlQuery(query) File "/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco raise java_exception pyflink.util.exceptions.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49) at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193) at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi, I also think there should be different ways to achieve the target. For the first option listed previously, the pseudo-code roughly like class MyFunciton extends KeyedProcessFunction { ValueState count; void open() { count = ... // Create the value state } void processElement(T t, Context context, Collector collector) { Integer current = count.get(); if (current == null) { context.timeService().registerTimer(30); // Register timer for the first time current = 0; } count.update(current + 1); // update the count } void onTimer(...) { collector.collect(new Tuple2<>(getCurrentKey(), count.get()); context.timeService().registerTimer(30); // register the following timer } } 1. For flink the state and timer are all bound to a key implicitly, thus I think they should not need to be bound manually. 2. To clear the outdated state, it could be cleared via count.clear(); if it has been 0 for a long time. There are different ways to count the interval, like register another timer and clear the timer when received the elements or update the counter to -1, -2... to mark how much timer it has passed. Best, Yun --Original Mail -- Sender:Khachatryan Roman Send Date:Tue Feb 9 02:35:20 2021 Recipients:Jan Brusch CC:Yun Gao , user Subject:Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem Hi, Probably another solution would be to register a timer (using KeyedProcessFunction) once we see an element after keyBy. The timer will fire in windowIntervalMs. Upon firing, it will emit a dummy element which will be ignored (or subtracted) in the end. Upon receiving each new element, the function will shift the timer accordingly. Regards, Roman On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch wrote: Hi Yun, thanks for your reply. I do agree with your point about standard windows being for high level operations and the lower-level apis offering a rich toolset for most advanced use cases. I have tried to solve my problem with keyedProcessFunctions also but was not able to get it to work for two reasons: 1) I was not able to set up a combination of ValueState, Timers and Triggers that emulated a sliding window with a rising and falling count (including 0) good enough. 2) Memory Leak: States / Windows should be cleared after a certain time of being at count 0 in order to prevent an infinitely rising of ValueStates (that are not needed anymore) Can you maybe please elaborate in pseudocode how you would envision your solution? Best regards Jan On 08.02.21 05:31, Yun Gao wrote: Hi Jan, From my view, I think in Flink Window should be as a "high-level" operation for some kind of aggregation operation and if it could not satisfy the requirements, we could at least turn to using the "low-level" api by using KeyedProcessFunction[1]. In this case, we could use a ValueState to store the current value for each key, and increment the value on each element. Then we could also register time for each key on receiving the first element for this key, and in the onTimer callback, we could send the current state value, update the value to 0 and register another timer for this key after 30s. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction --Original Mail -- Sender:Jan Brusch Send Date:Sat Feb 6 23:44:00 2021 Recipients:user Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem Hi, I was recently working on a problem where we wanted to implement a simple count on a sliding window, e.g. "how many messages of a certain type were emitted by a certain type of sensor in the last n minutes". Which sounds simple enough in theory: messageStream .keyBy(//EmitterType + MessageType) .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), Time.seconds(30))) .map(_ => 1) .reduce((x,y) => x + y) .addSink(...) But there is a tricky edge case: The downstream systems will never know when the count for a certain key goes back to 0, which is important for our use case. The technical reason being that flink doesn't open a window if there are no entries, i.e. a window with count 0 doesn't exist in flink. We came up with the following solution for the time being: messageStream .keyBy(//EmitterType + MessageType) .window(GlobalWindows.create()) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) .evictor(// CustomEvictor: Evict all messages older than n minutes BEFORE processing the window) .process(// CustomCounter: Count all Messages in Window State); .addSink(...) In the case of zero messages in the last n minutes, all messages will be evicted from the wi
Re: Any plans to make Flink configurable with pure data?
Hi Pilgrim, Currently table indeed could not using low level api like timer, would a mixture of sql & datastream could satisfy the requirements? A job might be created via multiple sqls, and connected via datastream operations. Best, Yun -- Sender:Pilgrim Beart Date:2021/02/09 02:22:46 Recipient: Theme:Any plans to make Flink configurable with pure data? To a naive Flink newcomer (me) it's a little surprising that there is no pure "data" mechanism for specifying a Flink pipeline, only "code" interfaces. With the DataStream interface I can use Java, Scala or Python to set up a pipeline and then execute it - but that doesn't really seem to need a programming model, it seems like configuration, which could be done with data? OK, one does need occasionally to specify some custom code, e.g. a ProcessFunction, but for any given use-case, a relatively static library of such functions would seem fine. My use case is that I have lots of customers, and I'm doing a similar job for each of them, so I'd prefer to have a library of common code (e.g. ProcessFunctions), and then specify each customer's specific requirements in a single config file. To do that in Java, I'd have to do metaprogramming (to build various pieces of Java out of that config file). Flink SQL seems to be the closest solution, but doesn't appear to support fundamental Flink concepts such as timers (?). Is there a plan to evolve Flink SQL to support timers? Timeouts is my specific need. Thanks, -Pilgrim -- Learn more at https://devicepilot.com @devicepilot
Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Hi, Could you have a try to add the jar via python configuration explicitly? It might refer to [1]. Best, Yun [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program -- Sender:joris.vanagtmaal Date:2021/02/09 15:50:27 Recipient: Theme:Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector My JAR files included in the same folder i run the python code: flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR kafka-clients-2.7.0.JAR -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Debugging long Flink checkpoint durations
Hi Dan, I think you could see the detail of the checkpoints via the checkpoint UI[1]. Also, if you see in the pending checkpoints some tasks do not take snapshot, you might have a look whether this task is backpressuring the previous tasks [2]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html -- Sender:Dan Hill Date:2021/03/02 04:34:56 Recipient:user Theme:Debugging long Flink checkpoint durations Hi. Are there good ways to debug long Flink checkpoint durations? I'm running a backfill job that runs ~10 days of data and then starts checkpointing failing. Since I only see the last 10 checkpoints in the jobmaster UI, I don't see when it starts. I looked through the text logs and didn't see much. I assume: 1) I have something misconfigured that is causing old state is sticking around. 2) I don't have enough resources.
Re: Re: Checkpoint Error
Hi Navneeth, It seems from the stack that the exception is caused by the underlying EFS problems ? Have you checked if there are errors reported for EFS, or if there might be duplicate mounting for the same EFS and others have ever deleted the directory? Best, Yun --Original Mail -- Sender:Navneeth Krishnan Send Date:Sun Mar 7 15:44:59 2021 Recipients:user Subject:Re: Checkpoint Error Hi All, Any suggestions? Thanks On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan wrote: Hi All, We are running our streaming job on flink 1.7.2 and we are noticing the below error. Not sure what's causing it, any pointers would help. We have 10 TM's checkpointing to AWS EFS. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)... 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handleat java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)... 5 moreCaused by: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handleat org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 moreCaused by: java.io.IOException: Stale file handleat java.io.FileOutputStream.close0(Native Method)at java.io.FileOutputStream.access$000(FileOutputStream.java:53)at java.io.FileOutputStream$1.close(FileOutputStream.java:356)at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at java.io.FileOutputStream.close(FileOutputStream.java:354)at org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)... 12 more Thanks
Re: java options to generate heap dump in EMR not working
Hi, I tried with the standalone session (sorry I do not have a yarn cluster in hand) and it seems that the flink cluster could startup normally. Could you check the log of NodeManager to see the detail reason that the container does not get launched? Also have you check if there are some spell error or some unexpected special white space character for the configuration ? For the case of configuring `env.java.opts`, it seems the JobManager also could not be launched with this configuration. Best, Yun --Original Mail -- Sender:bat man Send Date:Sat Mar 6 16:03:06 2021 Recipients:user Subject:java options to generate heap dump in EMR not working Hi, I am trying to generate a heap dump to debug a GC overhead OOM. For that I added the below java options in flink-conf.yaml, however after adding this the yarn is not able to launch the containers. The job logs show it goes on requesting for containers from yarn and it gets them, again releases it. then again the same cycle continues. If I remove the option from flink-conf.yaml then the containers are launched and the job starts processing. env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof" If I try this then yarn client does not comes up - env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof" Am I doing anything wrong here? PS: I am using EMR. Thanks, Hemant
Re: How do I call an algorithm written in C++ in Flink?
Hi Suxi, Do you mean you want to call the algorithm in C++ ? If so, I think you could do it the same with as you wrap it in SpringBoot project via JNI. I think you do not need to add a new operator, and you could use existing Flink API, and you could load you library in open() and call the algorithm in the following processing method. Best, Yun --Original Mail -- Sender:苏喜 张 <15138217...@163.com> Send Date:Mon Mar 8 14:12:02 2021 Recipients:user@flink.apache.org Subject:How do I call an algorithm written in C++ in Flink? The company has provided an algorithm written in C++, which has been packaged into a.so file. I have built a SpringBoot project, which uses JNI to operate the algorithm written in C++. Could you please tell me how to call it in Flink? Do i need to define operators, chains of operators?
Re: Re: Flink application has slightly data loss using Processing Time
Hi Rainie, From the code it seems the current problem does not use the time-related functionality like window/timer? If so, the problem would be indepdent with the time type used. Also, it would not likely due to rebalance() since the network layer has the check of sequence number. If there are missed record there would be failover. Since the current logic seems not rely on too much complex functionality, would it be possible that there might be some inconsistency between the flink implementation and the presto one ? Best, Yun -- Sender:Rainie Li Date:2021/03/08 17:14:30 Recipient:Smile Cc:user Theme:Re: Flink application has slightly data loss using Processing Time Thanks for the quick response, Smile. I don't use window operators or flatmap. Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? Thanks again. Best regards Rainie SingleOutputStreamOperator> matchedRecordsStream = eventStream .rebalance() .process(new ProcessFunction>() { public void processElement( T element, ProcessFunction>.Context context, Collector> collector) { for (StreamFilter filter : filters) { if (filter.match(element)) { SubstreamConfig substreamConfig = filter.getSubstreamConfig(); SplitterIntermediateRecord result = new SplitterIntermediateRecord<>( substreamConfig.getKafkaCluster(), substreamConfig.getKafkaTopic(), substreamConfig.getCutoverKafkaTopic(), substreamConfig.getCutoverTimestampInMs(), element); collector.collect(result); } } } }) .name("Process-" + eventClass.getSimpleName()); On Mon, Mar 8, 2021 at 1:03 AM Smile wrote: Hi Rainie, Could you please provide more information about your processing logic? Do you use window operators? If there's no time-based operator in your logic, late arrival data won't be dropped by default and there might be something wrong with your flat map or filter operator. Otherwise, you can use sideOutputLateData() to get the late data of the window and have a look at them. See [1] for more information about sideOutputLateData(). [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Gradually increasing checkpoint size
Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Mon Mar 8 14:59:48 2021 Recipients:user Subject:Gradually increasing checkpoint size Hi! I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan
Re: Re: Re: Checkpoint Error
Hi Navneeth, Is the attached exception the root cause for the checkpoint failure ? Namely is it also reported in job manager log? Also, have you enabled concurrent checkpoint? Best, Yun --Original Mail -- Sender:Navneeth Krishnan Send Date:Mon Mar 8 13:10:46 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Checkpoint Error Hi Yun, Thanks for the response. I checked the mounts and only the JM's and TM's are mounted with this EFS. Not sure how to debug this. Thanks On Sun, Mar 7, 2021 at 8:29 PM Yun Gao wrote: Hi Navneeth, It seems from the stack that the exception is caused by the underlying EFS problems ? Have you checked if there are errors reported for EFS, or if there might be duplicate mounting for the same EFS and others have ever deleted the directory? Best, Yun --Original Mail -- Sender:Navneeth Krishnan Send Date:Sun Mar 7 15:44:59 2021 Recipients:user Subject:Re: Checkpoint Error Hi All, Any suggestions? Thanks On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan wrote: Hi All, We are running our streaming job on flink 1.7.2 and we are noticing the below error. Not sure what's causing it, any pointers would help. We have 10 TM's checkpointing to AWS EFS. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)... 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handleat java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)... 5 moreCaused by: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handleat org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 moreCaused by: java.io.IOException: Stale file handleat java.io.FileOutputStream.close0(Native Method)at java.io.FileOutputStream.access$000(FileOutputStream.java:53)at java.io.FileOutputStream$1.close(FileOutputStream.java:356)at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at java.io.FileOutputStream.close(FileOutputStream.java:354)at org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)... 12 more Thanks
Re: questions about broadcasts
Hi Marco, (a) It is possible for an operator to receive two different kind of broadcasts, DataStream ints = DataStream strs = ... ints.broadcast().connect(strs.broadcast()) .process(new CoProcessFunction(){...}); (b) Traditional Flink operator could not accept three different inputs. There is a new MultipleInputOperator that could accept arbitrary number of inputs [1]. However It is currently not expose directly to end users, and you would need to work on some low-level api to use it. Or an alternative might be use a tag to union the two input streams (or any two of the three inputs) and use the (keyed)CoProcessFunction above. Also note that the broadcast is only a partitioner, and it is treated no difference with other partitioners for downstream operators. Best, Yun [1] https://github.com/apache/flink/blob/51524de8fd337aafd30952873b36216c5a3c43bc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java#L261 -- Sender:Marco Villalobos Date:2021/03/06 09:47:53 Recipient:user Theme:questions about broadcasts Is it possible for an operator to receive two different kinds of broadcasts? Is it possible for an operator to receive two different types of streams and a broadcast? For example, I know there is a KeyedCoProcessFunction, but is there a version of that which can also receive broadcasts?
Re: New settings are not honored unless checkpoint is cleared.
Hi Yordan, What are the settings that are changed during the tests? Best, Yun -- From:Yordan Pavlov Send Time:2021 Mar. 5 (Fri.) 23:36 To:user Subject:New settings are not honored unless checkpoint is cleared. Hello there, I am running Flink 1.11.3 on Kubernetes deployment. If I change a setting and re-deploy my Flink setup, the new setting is correctly applied in the config file but is not being honored by Flink. In other words, I can ssh into the pod and check the config file - it has the new setting as I would expect. However the web interface for the job keeps showing the old configuration and Flink as a whole keep running with the old setting. The way to have the new setting considered is to clear the checkpoint for the job stored in Zookeeper. Then I recover the job using: --fromSavepoint path_to_savepoint_or_checkpoint My presumption is that the job configuration is stored in Zookeeper along with other Flink data. Could someone shed some light on what I am observing. Thank you!
Re: Re: How to check checkpointing mode
Hi Alexey, Sorry I also do not see problems in the attached code. Could you add a breakpoint at `see.execute(name)` and have a look at the value of see#checkpointCfg#checkpointingMode ? Best, Yun --Original Mail -- Sender:Alexey Trenikhun Send Date:Tue Mar 9 07:25:31 2021 Recipients:Flink User Mail List , Yun Gao Subject:Re: How to check checkpointing mode Hi Yun, Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods: @Override public Void call() throws Exception { LOGGER.info("{}", new Info().toLog()); if (!allParameters.isEmpty()) { // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add // -- after jobs argument, this unnecessary for us arguments will be treated as positional // parameters, which we ignore but log warning LOGGER.warn("Unexpected parameters: {}", allParameters); } try { final StreamExecutionEnvironment see = buildStreamExecutionEnvironment(); see.execute(name); return null; } catch (InterruptedException e) { LOGGER.error("Stream Processor was interrupted", e); Thread.currentThread().interrupt(); throw e; } catch (Exception e) { LOGGER.error("Stream Processor is terminated due to exception", e); throw e; } } private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException { initDefaultKafkaSource(); final long deviationMillis = deviation.toMillis(); final GlobalAppConfig globalAppConfig = config(); final StreamExecutionEnvironment see = StreamExecutionEnvironment .getExecutionEnvironment() .enableCheckpointing(checkpointInterval.toMillis(), CheckpointingMode.AT_LEAST_ONCE) .setMaxParallelism(1024) .setParallelism(parallelism); if (externalizedCheckpoints) { see.getCheckpointConfig() .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } see.getConfig().disableGenericTypes(); see.getConfig().disableAutoGeneratedUIDs(); configureStateBackend(see); final Properties producerProperties = new PropertiesBuilder() .putAll(kafkaCommonOptions) .putAll(kafkaProducerOptions) .varFiles(valueFiles) .build(); final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder() .semantic(Semantic.AT_LEAST_ONCE) .config(producerProperties) .build(); final AutoTopic autoTopic = AutoTopic.builder() .config(producerProperties) .partitions(autoCreateTopicsPartitions) .replicationFactor(autoCreateTopicsReplicationFactor) .doNotCreateTopics(ImmutableSet.of( gspCfg, gspCustom, gspIxn, gspOutbound, gspSm )) .build(); see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1))); // since Flink 1.12 default stream characteristic is event time, // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum // is deprecated. // If needed explicitly using processing-time windows and timers works in event-time mode. addHeartbeats(see); final TStateCleanupOnTimeout.Factory cleanupFactory = new TStateCleanupOnTimeout.Factory( maxCallDuration, postmortemCallDuration, globalAppConfig.timerGranularity() ); @Nullable final SingleOutputStreamOperator cfgXform; @Nullable final DataStream cfgSource = addSources(see, SourceTopic.GCA_CFG, new CfgJsonDeserializationSchema(), (event, timestamp) -> event.getBatchId(), it -> !it.getHeartbeat()); if (cfgSource != null) { cfgXform = cfgSource .keyBy(PbCfgDatum::getCcId) .process(new CfgTransform()) .uid("xform-cfg") .name("XForm Config"); if (!isNullOrEmpty(gspCfg)) { cfgXform.addSink(producerFactory.create(gspCfg, autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg .uid("uid-" + gspCfg) .name(gspCfg); } else { cfgXform.addSink(new DiscardingSink<>()) .uid("uid-gsp-cfg-null") .name("gsp-cfg-null"); } } else { cfgXform = null; } final DataStream voiceCallThreadSource = addSources(see, SourceTopic.VOICE_CALL_THREAD, callThreadFormat == KafkaTopicFormat.JSON ? new TJsonDeserializationSchema() : new CallEventDeserializationSchema(), (event, timestamp) -> Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp()) ? timestamp - deviationMillis : Instants.toMillis(event.getTimestamp()), event -> event.getType() != EventType.EVENT_UNKNOWN); final SingleOutputStreamOperator tcmDataStream1 = voiceCallThreadSource .keyBy(CallEventKey::new) .process
Re: Re: Gradually increasing checkpoint size
Hi Dan, Regarding the original checkpoint size problem, could you also have a check which tasks' state are increasing from the checkpoint UI ? For example, the attached operator has a `alreadyOutputed` value state, which seems to keep increasing if there are always new keys ? Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Tue Mar 9 00:59:24 2021 Recipients:Yun Gao CC:user Subject:Re: Gradually increasing checkpoint size Hi Yun! Thanks for the quick reply. One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value. public class OnlyFirstUser extends RichFlatMapFunction { private transient ValueState alreadyOutputted; @Override public void flatMap(T value, Collector out) throws Exception { if (!alreadyOutputted.value()) { alreadyOutputted.update(true); out.collect(value); } } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor<>( "alreadyOutputted", // the state name TypeInformation.of(new TypeHint() {}), // type information false); // default value of the state, if nothing was set alreadyOutputted = getRuntimeContext().getState(descriptor); } } All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI. .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Mon Mar 8 14:59:48 2021 Recipients:user Subject:Gradually increasing checkpoint size Hi! I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan
Re: Re: Meaning of checkpointStartDelayNanos
Hi Kai, Yes, you are basically right, one minor point is that the start time is taken as the time that the checkpoint get intiated in the JM side. Best, Yun --Original Mail -- Sender:Kai Fu Send Date:Mon Apr 5 09:31:58 2021 Recipients:user Subject:Re: Meaning of checkpointStartDelayNanos I found its meaning in the code. It means the delay of checkpoint action when the checkpoint barrier comes to the current operator since it's intiated in the source. On Mon, Apr 5, 2021 at 9:21 AM Kai Fu wrote: Hi team, I'm a little confused by the meaning of checkpointStartDelayNanos, I do not understand what time it exactly means, but it seems it's a quite important indicator for checkpoint/backpresure. The explanation of it on metrics page does not help too much. Can someone help to explain it more clearly? -- Best regards,- Kai -- Best regards,- Kai
Re: Questions about checkpointAlignmentTime in unaligned checkpoint
Hi Kai, Under unaligned checkpoint settings, there are still alignment process. Although the task could snapshot the state of the operators on received the first barrier and emit barriers to the following tasks, it still need to wait till all the barriers to be received before finalize the checkpoint, and during this process, it need to snapshot the buffers that are skipped by the barrier, and the final snapshot would compose of both the operator snapshots and the snapshots of the skipped buffers. Therefore, the checkpointAlignmentTime metric still exists. Best, Yun --Original Mail -- Sender:Kai Fu Send Date:Mon Apr 5 09:18:39 2021 Recipients:user Subject:Questions about checkpointAlignmentTime in unaligned checkpoint Hi team, I'm observing the metrics reporter still emits checkpointAlignmentTime metric in the unaligned checkpoint setting as shown in the figure below. Is it a meaningful in unaligned checkpoint, since I suppose the alignment operation only happens in aligned checkpoint. -- Best regards,- Kai
Re: Union of more then two streams
Hi, With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two operators, the first operators would union a and b and output the enriched data, and then .connect(c).coprocess(xx) would pass-throught the already enriched data and enrich the record from c. Since the two operators could not get chained, the performance seems would be affected. Another method is to first label each input with a tag, e.g., ("a", a record), ("b", b record), .. and then use a.union(b).union(c).union(d).process(xx) then in the process operator, different logic could be chosen according to the tag. If adding tag is hard, then it might need to use the new multiple-inputs operator, which somehow would need to use the low-level API of Flink, thus I would recommend the above tag + union method first. Best, Yun --Original Mail -- Sender:B.B. Send Date:Fri Apr 2 16:41:16 2021 Recipients:flink_user Subject:Union of more then two streams Hi, I have an architecture question regarding the union of more than two streams in Apache Flink. We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream. Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream. Idea is to make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state). The problem is that enriched data foreign keys of every code book is different. Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key codebook_fk2,…. that connects with main stream. This means I cannot use the keyBy with coProcessFunction. Is this doable with union or I should cascade a series of connect streams with main stream, eg. mainstream.conect(codebook_1) -> mainstreamWihtCodebook1.connect(codebook_2) - > mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….? I read somewhere that this later approach is not memory friendly. Thx. BB.
Re: Official flink java client
Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the client? Best, Yun --Original Mail -- Sender:gaurav kulkarni Send Date:Fri Apr 23 10:14:08 2021 Recipients:User Subject:Official flink java client Hi, Is there any official flink client in java that's available? I came across RestClusterClient, but I am not sure if its official. I can create my own client, but just wanted to check if there is anything official available already that I can leverage. Thanks, Gaurav Run already deployed job on Flink Cluster using RestClusterClient I am trying to run already deployed job on Flink Cluster using Rest request.I had success using a simple rest ...
Re: Re: Official flink java client
Hi Falvio, Very thanks for the explanation, may be another option is to have a look at the http rest API[1] ? Flink provides official http api to submit jar jobs and query job status, and they might be able to help. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html --Original Mail -- Sender:Flavio Pompermaier Send Date:Fri Apr 23 15:25:55 2021 Recipients:Yun Gao CC:gaurav kulkarni , User Subject:Re: Official flink java client I also interface to Flink clusters using REST in order to avoid many annoying problems (due to dependency conflicts, classpath or env variables). I use an extended version of the RestClusterClient that you can reuse if you want to. It is available at [1] and it add some missing methods to the default Flink version (I also had to copy that class and modify the visibility of some field in order to enable the extension). Officially the Flink RestClusterClient is meant to be used for internal use only but it actually work very well. Best, Flavio [1] https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java On Fri, Apr 23, 2021 at 5:10 AM Yun Gao wrote: Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the client? Best, Yun --Original Mail -- Sender:gaurav kulkarni Send Date:Fri Apr 23 10:14:08 2021 Recipients:User Subject:Official flink java client Hi, Is there any official flink client in java that's available? I came across RestClusterClient, but I am not sure if its official. I can create my own client, but just wanted to check if there is anything official available already that I can leverage. Thanks, Gaurav Run already deployed job on Flink Cluster using RestClusterClient I am trying to run already deployed job on Flink Cluster using Rest request.I had success using a simple rest ...
Re: Re: Re: Official flink java client
Hi Flavio, Got that, from my view I think RestClusterClient might not be viewed as public API, and might be change between version, thus it might need to be careful when upgrading. Best, Yun --Original Mail -- Sender:Flavio Pompermaier Send Date:Fri Apr 23 16:10:05 2021 Recipients:Yun Gao CC:gaurav kulkarni , User Subject:Re: Re: Official flink java client Obviously I could rewrite a java client from scratch that interface with the provided REST API but why if I can reuse something already existing? Usually I interface with REST API using auto generated clients (if APIs are exposed via Swagger or OpenApi). If that's not an option, writing a REST client from scratch is something I try to avoid as much as I can.. Best, Flavio On Fri, Apr 23, 2021 at 9:55 AM Yun Gao wrote: Hi Falvio, Very thanks for the explanation, may be another option is to have a look at the http rest API[1] ? Flink provides official http api to submit jar jobs and query job status, and they might be able to help. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html --Original Mail -- Sender:Flavio Pompermaier Send Date:Fri Apr 23 15:25:55 2021 Recipients:Yun Gao CC:gaurav kulkarni , User Subject:Re: Official flink java client I also interface to Flink clusters using REST in order to avoid many annoying problems (due to dependency conflicts, classpath or env variables). I use an extended version of the RestClusterClient that you can reuse if you want to. It is available at [1] and it add some missing methods to the default Flink version (I also had to copy that class and modify the visibility of some field in order to enable the extension). Officially the Flink RestClusterClient is meant to be used for internal use only but it actually work very well. Best, Flavio [1] https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java On Fri, Apr 23, 2021 at 5:10 AM Yun Gao wrote: Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the client? Best, Yun --Original Mail -- Sender:gaurav kulkarni Send Date:Fri Apr 23 10:14:08 2021 Recipients:User Subject:Official flink java client Hi, Is there any official flink client in java that's available? I came across RestClusterClient, but I am not sure if its official. I can create my own client, but just wanted to check if there is anything official available already that I can leverage. Thanks, Gaurav Run already deployed job on Flink Cluster using RestClusterClient I am trying to run already deployed job on Flink Cluster using Rest request.I had success using a simple rest ...
Re: pojo warning when using auto generated protobuf class
Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf types yet[1], thus it would use a generic serializer created by kryo. This should not affect the rightness of the program and should only affect its performance. One possible solution might be register custom serializer into the kryo serializer framework for protobuf classes, like the example in [2]. Best, Yun [1] https://issues.apache.org/jira/browse/FLINK-11333 [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html --Original Mail -- Sender:Prashant Deva Send Date:Sat Apr 24 11:00:17 2021 Recipients:User Subject:pojo warning when using auto generated protobuf class I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class: 18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. here is my serializer. What am i doing wrong? class ApiTraceSchema: DeserializationSchema, SerializationSchema { override fun getProducedType(): TypeInformation { return TypeInformation.of(Trace.APITrace::class.java) } override fun deserialize(message: ByteArray): Trace.APITrace { return Trace.APITrace.parseFrom(message) } override fun isEndOfStream(nextElement: Trace.APITrace): Boolean { return false } override fun serialize(element: Trace.APITrace): ByteArray { return element.toByteArray() } }
Re: Too man y checkpoint folders kept for externalized retention.
Hi John, Logically the maximum retained checkpoints are configured by state.checkpoints.num-retained [1]. Have you configured this option? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained -- Sender:John Smith Date:2021/04/24 01:41:41 Recipient:user Theme:Too man y checkpoint folders kept for externalized retention. Hi running 1.10.0. Just curious is this specific to externalized retention or checkpointing in general. I see my checkpoint folder counting thousands of chk-x folders. If using default checkpoint or NONE externalized checkpointing does the count of chk- folders grow indefinitely until the job is killed or it retains up to certain amount? Thanks
Re: Re: pojo warning when using auto generated protobuf class
Hi Prashant, Flink should always give warnings as long as the deduced result is GenericType, no matter it uses the default kryo serializer or the register one, thus if you have registered the type, you may simply ignore the warnings. To make sure it works, you may find the tm that the source tasks resides, and use jmap to see if ProtoSerializer is created or not. Best, Yun --Original Mail -- Sender:Prashant Deva Send Date:Sun Apr 25 01:18:41 2021 Recipients:Yun Gao CC:User Subject:Re: pojo warning when using auto generated protobuf class so i did register the type with Kryo and the ProtobufSerializer. However I am still continuing to see the warnings. is this a bug in Flink? env.config.registerTypeWithKryoSerializer(Trace.APITrace::class.java, ProtobufSerializer::class.java) val stream: DataStreamSource = env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props)) Sent via Superhuman On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao wrote: Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf types yet[1], thus it would use a generic serializer created by kryo. This should not affect the rightness of the program and should only affect its performance. One possible solution might be register custom serializer into the kryo serializer framework for protobuf classes, like the example in [2]. Best, Yun [1] https://issues.apache.org/jira/browse/FLINK-11333 [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html --Original Mail -- Sender:Prashant Deva Send Date:Sat Apr 24 11:00:17 2021 Recipients:User Subject:pojo warning when using auto generated protobuf class I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class: 18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. here is my serializer. What am i doing wrong? class ApiTraceSchema: DeserializationSchema, SerializationSchema { override fun getProducedType(): TypeInformation { return TypeInformation.of(Trace.APITrace::class.java) } override fun deserialize(message: ByteArray): Trace.APITrace { return Trace.APITrace.parseFrom(message) } override fun isEndOfStream(nextElement: Trace.APITrace): Boolean { return false } override fun serialize(element: Trace.APITrace): ByteArray { return element.toByteArray() } }
Re: Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
Hi Ragini, How did you submit your job ? The exception here is mostly cuased that the `flink-client` is not included in the classpath at the client side. If the job is submitted via the flink cli, namely `flink run -c xx.jar`, it should be included by default, and if some programming way is used, then the flink-client must be included in the dependency. Best, Yun -- Sender:Ragini Manjaiah Date:2021/05/04 17:25:30 Recipient:user Theme:Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application. Hi Team, I am trying to submit a flink job of version 1.11.3 . The actual application is developed in flink 1.8.1. Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 (flink-1.11.3-bin-scala_2.11.tgz) and tried to submit the job. while submitting facing the below mentioned exception . I have set the HADOOP parameters : export HADOOP_CONF_DIR=/etc/hadoop/conf export HADOOP_CLASSPATH=`hadoop classpath` Is there any changes I need to do it the pom file to overcome this org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No ExecutorFactory found to execute the application. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700) at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
Re: Define rowtime on intermediate table field
Hi Sumeet, I think you might first convert the table back to the DataStream [1], then define the timestamp and watermark with `assignTimestampsAndWatermarks(...)`, and then convert it back to table[2]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#convert-a-table-into-a-datastream [2] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion --Original Mail -- Sender:Sumeet Malhotra Send Date:Tue May 4 16:32:10 2021 Recipients:user Subject:Define rowtime on intermediate table field Hi, My use case involves reading raw data records from Kafka and processing them. The records are coming from a database, where a periodic job reads new rows, packages them into a single JSON object (as described below) and writes the entire record to Kafka. { 'id': 'some_id', 'key_a': 'value_a', 'key_b': 'value_b', 'result': { 'columns': [ 'col_a', 'col_b', 'col_c', 'col_d' ], 'rows': [ ['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'], ['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'], ['2021-05-04T05:23:13.953610Z', '655363', '2', '562'], ... ... ] } } As can be seen, the row time is actually embedded in the `result` object. What I'm doing at the moment is to run this data through a user defined table function, which parses the `result` object as a string, and emits multiple rows that include the timestamp field. This is working fine. In the next step, I would want to perform windowing on this transformed data. That requires defining the event time attribute along with the watermark. As I understand, this can be done either during the initial table DDL definition or during conversion to a datastream. Since I extract the timestamp value only after reading from Kafka, how can I define an event time attribute on the intermediate table that's basically a result of the user defined table function? The only solution I can think of at the moment, is to write the intermediate table back to Kafka, and then create a new consumer that reads from Kafka, where I can define the event time attribute as part of its DDL. This most likely won't be good performance wise. I'm looking at any other way, I can define event time on results of my user defined table function? Thanks in advance, Sumeet
Re: Re: Handling "Global" Updating State
Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast state of some operator. The job would keep the source alive after all the data are emit (like sleep forever), and when all the data are processed, then stop the job with savepoint. 2. Use the savepoint to start the original job. For the operator required the broadcast state, it could set the same uid and same state name with the corresponding operator in the first job, so it could acqure the state content on startup. Yun, Best [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source --Original Mail -- Sender:Rion Williams Send Date:Mon May 17 07:00:03 2021 Recipients:user Subject:Re: Handling "Global" Updating State Hey folks, After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic. My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream. My thought initially is to do something like this, if it’s possible: - Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity) - Use this to initialize the state of my broadcast stream (if possible) - At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point. Is this an oversimplification or is there an obviously better / well known approach to handling this? Thanks, Rion On May 14, 2021, at 9:51 AM, Rion Williams wrote: Hi all, I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink. A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem. My initial naive approach was: When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety. Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream. Expose the thread-safe collection within the operators to actually perform the filtering. This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against. I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear. Thanks much, Rion
Re: Re: Handling "Global" Updating State
Hi Rion, Sorry for the late reply, another simpler method might indeed be in initializeState, the operator directly read the data from the kafka to initialize the state. Best, Yun --Original Mail -- Sender:Rion Williams Send Date:Mon May 17 19:53:35 2021 Recipients:Yun Gao CC:user Subject:Re: Handling "Global" Updating State Hi Yun, That’s very helpful and good to know that the problem/use-case has been thought about. Since my need is probably shorter-term than later, I’ll likely need to explore a workaround. Do you know of an approach that might not require the use of check pointing and restarting? I was looking into exploring initializeState within my broadcast-side stream to get it current and then simply listening to the Kafka topic as records come in. I’d imagine this would work, but that may be a bit of a naive approach. Thanks! Rion On May 17, 2021, at 1:36 AM, Yun Gao wrote: Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast state of some operator. The job would keep the source alive after all the data are emit (like sleep forever), and when all the data are processed, then stop the job with savepoint. 2. Use the savepoint to start the original job. For the operator required the broadcast state, it could set the same uid and same state name with the corresponding operator in the first job, so it could acqure the state content on startup. Yun, Best [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source --Original Mail -- Sender:Rion Williams Send Date:Mon May 17 07:00:03 2021 Recipients:user Subject:Re: Handling "Global" Updating State Hey folks, After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic. My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream. My thought initially is to do something like this, if it’s possible: - Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity) - Use this to initialize the state of my broadcast stream (if possible) - At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point. Is this an oversimplification or is there an obviously better / well known approach to handling this? Thanks, Rion On May 14, 2021, at 9:51 AM, Rion Williams wrote: Hi all, I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink. A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem. My initial naive approach was: When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety. Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream. Expose the thread-safe collection within the operators to actually perform the filtering. This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against. I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear. Thanks much, Rion