Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits
Hi Becket, I still sustain what I wrote before: > I think I would still vote soft -1 on this option, but I wouldn't block it in case I am out-voted. > I think it might be helpful to agree on the definition of optional in our case. For me it doesn't matter whether a default method throwing an exception we call optional or non-optional. As long as we keep it this way, the effect is the same. It's effectively a method that a user doesn't have to implement. If interface/system allows some methods to be not implemented, some users will do just that, regardless if we call it and document as non-optional. And at the same time it's clogging the base interface. By the way, just the need for a java-doc/documentation explaining the existence of some construct is a bad smell (code should be self-documenting and default method throwing an UnsupportedOperationException is not). > Please note that so far we do not assume whether the feature is in > the original API or it is added later. A newly added feature can also be > non-optional, although it might take some time for all the pluggable > developers to catch up, and they should still work if the new feature is > not used until they catch up. In contrast, we may never expect an optional > feature to catch up, because leaving it unimplemented is also blessed. > > Let's take the checkpointing as an example. Imagine Flink did not support > checkpointing before release 1.16. And now we are trying to add > checkpointing to Flink. So we exposed the checkpoint configuration to the > end users. In the meantime, will we tell the pluggable (e.g. operators, > connectors) developers that methods like "snapshotState()" is optional? If > we do that, the availability of checkpointing in Flink would be severely > weakened. But apparently we should still allow the existing implementations > to work without checkpointing. It looks to me that adding the method to the > pluggable interfaces with a default implementation throwing > "UnsupportedOperationException" would be the solution here. Please note > that in this case, having the default implementation does not mean this is > optional. It is just the technique to support backwards compatibility in > the feature evolution. The fact that this method is in the base interface > suggests it is not optional, so the developers SHOULD implement it. I would soft vote -1 for having the default method throwing UnsupportedOperationException as one of thing for this (FLIP-217) special circumstances. At the moment, without thinking this over too much, I would vote harder -1 for having this as a general rule when adding new features. If we ever end up with an API that is littered with default methods throwing UnsupportedOperationException that are documented as "non optional" it would be IMO a big design failure. I would be ok-ish with that, only if that was a temporary thing and we had an aggressive plan to release more often new major Flink versions (2.x.y, 3.x.y, ...) breaking API compatibility, that would get rid of those default methods. Adding checkpointing and methods like "snapshotState()" would IMO easily justify a new major Flink release. In that case we could add those methods with default implementation for some transition period, a one or two minor releases, followed by a clean up in a major release. However I would still argue that it would be cleaner/better to add a decorative interface like `CheckpointedOperator` interface instead of adding those default methods to the base `Operator` interface. I think I can sum up our disagreement here is that I would like to keep the interfaces simpler, with only obligatory methods/features on one side and clearly optional features on the other. While you would like to add an extra third state in between those two? Best, Piotrek czw., 12 maj 2022 o 04:25 Becket Qin napisał(a): > Thanks for the clarification, Piotr and Sebastian. > > It looks like the key problem is still whether the implementation of > pausable splits in the Sources should be optional or not. > > I think it might be helpful to agree on the definition of optional in our > case. To me: > Optional = "You CAN leave the method unimplemented, and that is fine." > Non-Optional = "You CAN leave the method unimplemented, but you SHOULD NOT, > because people assume this works." > > I think one sufficient condition of a Non-Optional feature is that if the > feature is exposed through the framework API, Flink should expect the > pluggables to support this feature by default. Otherwise the availability > of that feature becomes undefined. > > Please note that so far we do not assume whether the feature is in > the original API or it is added later. A newly added feature can also be > non-optional, although it might take some time for all the pluggable > developers to catch up, and they should still work if the new feature is > not used until they catch up. In contrast, we may never expect an optional > feature to catch up, because
[jira] [Created] (FLINK-27797) PythonTableUtils.getCollectionInputFormat cannot correctly handle None values
Yunfeng Zhou created FLINK-27797: Summary: PythonTableUtils.getCollectionInputFormat cannot correctly handle None values Key: FLINK-27797 URL: https://issues.apache.org/jira/browse/FLINK-27797 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Yunfeng Zhou In `PythonTableUtils.getCollectionInputFormat` there are implementations like follows. This code can be found at [https://github.com/apache/flink/blob/8488368b86a99a064446ca74e775b670b94a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java#L515] ``` c -> { if (c.getClass() != byte[].class || dataType instanceof PickledByteArrayTypeInfo) { return c; } ``` Here, the generated function did not check `c != null` before doing `c.getClass()`. which might cause that tables created through pyflink cannot parse it when values are `None`. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27798) Migrate Flink ML to Flink 1.15.0
Yunfeng Zhou created FLINK-27798: Summary: Migrate Flink ML to Flink 1.15.0 Key: FLINK-27798 URL: https://issues.apache.org/jira/browse/FLINK-27798 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Update Flink ML's Flink dependency to 1.15.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[VOTE] FLIP-235: Hybrid Shuffle Mode
Hi everyone, Thanks for the feedback for FLIP-235: Hybrid Shuffle Mode[1] on the discussion thread [2] I'd like to start a vote for it. The vote will last for at least 72 hours unless there is an objection or insufficient votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode [2] https://lists.apache.org/thread/hfwpcs54sm5gp3mhv7s3lr79jywo3kv4 Best regards, Weijie
[VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2
Hi everyone, Please review and vote on the release candidate #2 for the version 1.0.0 of Apache Flink Kubernetes Operator, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) **Release Overview** As an overview, the release consists of the following: a) Kubernetes Operator canonical source distribution (including the Dockerfile), to be deployed to the release repository at dist.apache.org b) Kubernetes Operator Helm Chart to be deployed to the release repository at dist.apache.org c) Maven artifacts to be deployed to the Maven Central Repository d) Docker image to be pushed to dockerhub **Staging Areas to Review** The staging areas containing the above mentioned artifacts are as follows, for your review: * All artifacts for a,b) can be found in the corresponding dev repository at dist.apache.org [1] * All artifacts for c) can be found at the Apache Nexus Repository [2] * The docker image for d) is staged on github [7] All artifacts are signed with the key 2FF2977BBBFFDF283C6FE7C6A301006F3591EE2C [3] Other links for your review: * JIRA release notes [4] * source code tag "release-1.0.0-rc2" [5] * PR to update the website Downloads page to include Kubernetes Operator links [6] **Vote Duration** The voting time will run for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. **Note on Verification** You can follow the basic verification guide here[8]. Note that you don't need to verify everything yourself, but please make note of what you have tested together with your +- vote. Thanks, Yang [1] https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.0.0-rc2/ [2] https://repository.apache.org/content/repositories/orgapacheflink-1504/ [3] https://dist.apache.org/repos/dist/release/flink/KEYS [4] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351500 [5] https://github.com/apache/flink-kubernetes-operator/tree/release-1.0.0-rc2 [6] https://github.com/apache/flink-web/pull/542 [7] ghcr.io/apache/flink-kubernetes-operator:6e2b896 [8] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Kubernetes+Operator+Release
Re: Re: [DISCUSS] FLIP-168: Speculative execution for Batch Job
Hi everyone, Thank you for all the feedback on this FLIP! I will open a vote for it since there is no more concern. Thanks, Zhu Zhu Zhu 于2022年5月11日周三 12:29写道: > > Hi everyone, > > According to the discussion and updates of the blocklist > mechanism[1] (FLIP-224), I have updated FLIP-168 to make > decision on itself to block identified slow nodes. A new > configuration is also added to control how long a slow > node should be blocked. > > [1] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h > > Thanks, > Zhu > > Zhu Zhu 于2022年4月29日周五 14:36写道: > > > > Thank you for all the feedback! > > > > @Guowei Ma > > Here's my thoughts for your questions: > > >> 1. How to judge whether the Execution Vertex belongs to a slow task. > > If a slow task fails and gets restarted, it may not be a slow task > > anymore. Especially given that the nodes of the slow task may have been > > blacklisted and the new task will be deployed to a new node. I think we > > should again go through the slow task detection process to determine > > whether it is a slow task. I agree that it is not ideal to take another > > 59 mins to identify a slow task. To solve this problem, one idea is to > > introduce a slow task detection strategy which identifies slow tasks > > according to the throughput. This approach needs more thoughts and > > experiments so we now target it to a future time. > > > > >> 2. The fault tolerance strategy and the Slow task detection strategy are > > >> coupled > > I don't think the fault tolerance and slow task detecting are coupled. > > If a task fails while the ExecutionVertex still has a task in progress, > > there is no need to start new executions for the vertex in the perspective > > of fault tolerance. If the remaining task is slow, in the next slow task > > detecting, a speculative execution will be created and deployed for it. > > This, however, is a normal speculative execution process rather than a > > failure recovery process. In this way, the fault tolerance and slow task > > detecting work without knowing each other and the job can still recover > > from failures and guarantee there are speculative executions for slow tasks. > > > > >> 3. Default value of > > >> `slow-task-detector.execution-time.baseline-lower-bound` is too small > > From what I see in production and knowing from users, there are many > > batch jobs of a relatively small scale (a few terabytes, hundreds of > > gigabytes). Tasks of these jobs can finish in minutes, so that a > > `1 min` lowbound is large enough. Besides that, I think the out-of-box > > experience is more important for users running small scale jobs. > > > > Thanks, > > Zhu > > > > Guowei Ma 于2022年4月28日周四 17:55写道: > >> > >> Hi, zhu > >> > >> Many thanks to zhuzhu for initiating the FLIP discussion. Overall I think > >> it's ok, I just have 3 small questions > >> > >> 1. How to judge whether the Execution Vertex belongs to a slow task. > >> The current calculation method is: the current timestamp minus the > >> timestamp of the execution deployment. If the execution time of this > >> execution exceeds the baseline, then it is judged as a slow task. Normally > >> this is no problem. But if an execution fails, the time may not be > >> accurate. For example, the baseline is 59 minutes, and a task fails after > >> 56 minutes of execution. In the worst case, it may take an additional 59 > >> minutes to discover that the task is a slow task. > >> > >> 2. Speculative Scheduler's fault tolerance strategy. > >> The strategy in FLIP is: if the Execution Vertex can be executed, even if > >> the execution fails, the fault tolerance strategy will not be adopted. > >> Although currently `ExecutionTimeBasedSlowTaskDetector` can restart an > >> execution. But isn't this dependency a bit too strong? To some extent, the > >> fault tolerance strategy and the Slow task detection strategy are coupled > >> together. > >> > >> > >> 3. The value of the default configuration > >> IMHO, prediction execution should only be required for relatively > >> large-scale, very time-consuming and long-term jobs. > >> If `slow-task-detector.execution-time.baseline-lower-bound` is too small, > >> is it possible for the system to always start some additional tasks that > >> have little effect? In the end, the user needs to reset this default > >> configuration. Is it possible to consider a larger configuration. Of > >> course, this part is best to listen to the suggestions of other community > >> users. > >> > >> Best, > >> Guowei > >> > >> > >> On Thu, Apr 28, 2022 at 3:54 PM Jiangang Liu > >> wrote: > >> > >> > +1 for the feature. > >> > > >> > Mang Zhang 于2022年4月28日周四 11:36写道: > >> > > >> > > Hi zhu: > >> > > > >> > > > >> > > This sounds like a great job! Thanks for your great job. > >> > > In our company, there are already some jobs using Flink Batch, > >> > > but everyone knows that the offline cluster has a lot more load > >> > > than > >> > > the online cluster, and t
Re: [VOTE] FLIP-224: Blocklist Mechanism
Thanks Lijie and Zhu for driving the FLIP! The blocked list functionality helps reduce the complexity in maintenance and the currently design looks good to me, thus +1 from my side (binding). Best, Yun -- From:Xintong Song Send Time:2022 May 26 (Thu.) 12:51 To:dev Subject:Re: [VOTE] FLIP-224: Blocklist Mechanism Thanks for driving this effort, Lijie. I think a nice addition would be to make this feature accessible directly from webui. However, there's no reason to block this FLIP on it. So +1 (binding) from my side. Best, Xintong On Fri, May 20, 2022 at 12:57 PM Lijie Wang wrote: > Hi everyone, > > Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the > discussion thread [2] > > I'd like to start a vote for it. The vote will last for at least 72 hours > unless there is an objection or insufficient votes. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism > [2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h > > Best, > Lijie >
[VOTE] FLIP-168: Speculative Execution for Batch Job
Hi everyone, Thanks for the feedback for FLIP-168: Blocklist Mechanism [1] on the discussion thread [2]. I'd like to start a vote for it. The vote will last for at least 72 hours unless there is an objection or insufficient votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job [2] https://lists.apache.org/thread/ot352tp8t7mclzx9zfv704gcm0fwrq58
Re: [DISCUSS] Releasing Flink 1.14.5
Hi Xingbo, Thanks for driving this release. +1 for 1.14.5 as there are already nearly 100 commits [1] since 1.14.4. Regards, Dian [1] https://github.com/apache/flink/compare/release-1.14.4...release-1.14 On Tue, May 24, 2022 at 2:23 PM Xingbo Huang wrote: > Hi all, > > I would like to start discussing releasing Flink 1.14.5. > > It has already been more than two months since we released 1.14.4. There > are currently 62 tickets[1] already resolved for 1.14.5, some of them quite > severe. > > Currently, there are no issues marked as critical or blocker for 1.14.5. > Please let me know if there are any issues you'd like to be included in > this release but still not merged. > > I would like to volunteer as a release manager for 1.14.5, and start the > release process once all the issues are merged. > > Best, > Xingbo > > [1] > > https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.14.5&startIndex=50 >
Re: [VOTE] FLIP-227: Support overdraft buffer
Hi. Thanks Fanrui for this FLIP. I think it will be useful thing for us. +1(non-binding) -- Best regards, Anton Kalashnikov 26.05.2022 06:00, rui fan пишет: Hi, everyone, Thanks for your feedback for FLIP-227: Support overdraft buffer[1] on the discussion thread[2]. I'd like to start a vote for it. The vote will be open for at least 72 hours unless there is an objection or not enough votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer [2] https://lists.apache.org/thread/4p3xcf0gg4py61hsnydvwpns07d1nog7 Best wishes fanrui
Re: [VOTE] FLIP-235: Hybrid Shuffle Mode
+1 Best, Xintong On Thu, May 26, 2022 at 3:47 PM weijie guo wrote: > Hi everyone, > > Thanks for the feedback for FLIP-235: Hybrid Shuffle Mode[1] on the > discussion thread [2] > > I'd like to start a vote for it. The vote will last for at least 72 hours > unless there is an objection or insufficient votes. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > [2] https://lists.apache.org/thread/hfwpcs54sm5gp3mhv7s3lr79jywo3kv4 > > Best regards, > > Weijie >
Re: [VOTE] FLIP-227: Support overdraft buffer
Yes, it will be a good improvement :) +1 (binding) Piotrek czw., 26 maj 2022 o 10:26 Anton Kalashnikov napisał(a): > Hi. > > Thanks Fanrui for this FLIP. I think it will be useful thing for us. > > +1(non-binding) > > -- > > Best regards, > Anton Kalashnikov > > 26.05.2022 06:00, rui fan пишет: > > Hi, everyone, > > > > Thanks for your feedback for FLIP-227: Support overdraft buffer[1] on > > the discussion thread[2]. > > > > I'd like to start a vote for it. The vote will be open for at least 72 > > hours unless there is an objection or not enough votes. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > [2] https://lists.apache.org/thread/4p3xcf0gg4py61hsnydvwpns07d1nog7 > > > > Best wishes > > fanrui > >
[jira] [Created] (FLINK-27799) Version 1.13.5 is not compatible with version 1.10 UDF
Yichao Yang created FLINK-27799: --- Summary: Version 1.13.5 is not compatible with version 1.10 UDF Key: FLINK-27799 URL: https://issues.apache.org/jira/browse/FLINK-27799 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.13.5 Reporter: Yichao Yang Flink 1.10 Version,The following code will work {code:java} // UDF public class SetStringUDF extends ScalarFunction { //@DataTypeHint("RAW") public Set eval(String input) { return Sets.newHashSet(input, input + "_1", input + "_2"); } @Override public TypeInformation getResultType(Class[] signature) { return TypeInformation.of(new TypeHint>() { }); } } public class GetSetValue extends ScalarFunction { public String eval(Set set) { return set.iterator().next(); } } StreamTableEnvironment.createFunction("set_string", SetStringUDF.class); StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class); CREATE TABLE Orders ( order_id BIGINT NOT NULL, name STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.name.length' = '1', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10' );CREATE TABLE target_table ( order_id BIGINT NOT NULL, name STRING, row_time timestamp(3), i STRING ) WITH ( 'connector' = 'print' ); INSERT INTO target_table SELECT *, cast(get_set_value(set_string(name)) as string) as i FROM Orders{code} but in Flink 1.13.5,it will throw exception like: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set'. Interpreting it as a structured type was also not successful. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) ... 36 more Caused by: org.apache.flink.table.api.ValidationException: Class 'java.util.Set' must not be abstract. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356) at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479) at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289) ... 37 more {code} I have to change my UDF to fix this problem. {code:java} public class GetSetValue extends ScalarFunction { public String eval(@DataTypeHint("RAW") Object set) { Set s = (Set) set; return s.iterator().next(); } } public class SetStringUDF extends ScalarFunction { @DataTypeHint("RAW") public Object eval(String input) { return Sets.newHashSet(input, input + "_1", input + "_2"); } } {code} I have two questions: # At present, is there a way to be compatible with this problem without changing the code? # If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can there be a plan to complete compatibility in the future -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [VOTE] FLIP-227: Support overdraft buffer
+1 (binding) On Thu, 26 May 2022, 11:21 Piotr Nowojski, wrote: > Yes, it will be a good improvement :) > > +1 (binding) > > Piotrek > > czw., 26 maj 2022 o 10:26 Anton Kalashnikov > napisał(a): > > > Hi. > > > > Thanks Fanrui for this FLIP. I think it will be useful thing for us. > > > > +1(non-binding) > > > > -- > > > > Best regards, > > Anton Kalashnikov > > > > 26.05.2022 06:00, rui fan пишет: > > > Hi, everyone, > > > > > > Thanks for your feedback for FLIP-227: Support overdraft buffer[1] on > > > the discussion thread[2]. > > > > > > I'd like to start a vote for it. The vote will be open for at least 72 > > > hours unless there is an objection or not enough votes. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > > [2] https://lists.apache.org/thread/4p3xcf0gg4py61hsnydvwpns07d1nog7 > > > > > > Best wishes > > > fanrui > > > > >
[jira] [Created] (FLINK-27800) addInEdge check state error
Licho sun created FLINK-27800: - Summary: addInEdge check state error Key: FLINK-27800 URL: https://issues.apache.org/jira/browse/FLINK-27800 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.15.0 Reporter: Licho sun Fix For: 1.15.0 when add InEdge, the checkState fucntion check the edge whether is in outEdges list, this should check whether in inEdges list. ``` public void addInEdge(StreamEdge inEdge) { checkState( outEdges.stream().noneMatch(inEdge::equals), "Adding not unique edge = %s to existing outEdges = %s", inEdge, inEdges); if (inEdge.getTargetId() != getId()) { throw new IllegalArgumentException("Destination id doesn't match the StreamNode id"); } else { inEdges.add(inEdge); } } ``` -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27801) Disable flushing stream when call Jackson's writeValue() method for CsvBulkWriter
luoyuxia created FLINK-27801: Summary: Disable flushing stream when call Jackson's writeValue() method for CsvBulkWriter Key: FLINK-27801 URL: https://issues.apache.org/jira/browse/FLINK-27801 Project: Flink Issue Type: Improvement Reporter: luoyuxia Currently, in `CsvBulkWriter`, for every single element, it's call Jackson's writeValue to write the element and then flush. It's time consuming to flush for every single element. we should also diasable flushing when call Jackson's writeValue method. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27802) Job submission errors are swallowed for Flink 1.15 + HA
Gyula Fora created FLINK-27802: -- Summary: Job submission errors are swallowed for Flink 1.15 + HA Key: FLINK-27802 URL: https://issues.apache.org/jira/browse/FLINK-27802 Project: Flink Issue Type: Improvement Reporter: Gyula Fora We are currently setting both a result store and the "execution.submit-failed-job-on-application-error" config for HA jobs. This leads to swallowed job submission errors that only show up in the result store, but the flink job is not actually displayed in the failed state: 2022-05-26 12:34:43,497 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'State machine job' () because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution. 2022-05-26 12:34:43,552 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY The easiest way to reproduce this is to create a new deployment and set initialSavepointPath to a random missing path. I consider this a bug in Flink but we should simply disable the execution.submit-failed-job-on-application-error config. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [VOTE] FLIP-227: Support overdraft buffer
+1 (binding) On Thu, May 26, 2022 at 11:55 AM Dawid Wysakowicz wrote: > +1 (binding) > > On Thu, 26 May 2022, 11:21 Piotr Nowojski, wrote: > > > Yes, it will be a good improvement :) > > > > +1 (binding) > > > > Piotrek > > > > czw., 26 maj 2022 o 10:26 Anton Kalashnikov > > napisał(a): > > > > > Hi. > > > > > > Thanks Fanrui for this FLIP. I think it will be useful thing for us. > > > > > > +1(non-binding) > > > > > > -- > > > > > > Best regards, > > > Anton Kalashnikov > > > > > > 26.05.2022 06:00, rui fan пишет: > > > > Hi, everyone, > > > > > > > > Thanks for your feedback for FLIP-227: Support overdraft buffer[1] on > > > > the discussion thread[2]. > > > > > > > > I'd like to start a vote for it. The vote will be open for at least > 72 > > > > hours unless there is an objection or not enough votes. > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > > > [2] https://lists.apache.org/thread/4p3xcf0gg4py61hsnydvwpns07d1nog7 > > > > > > > > Best wishes > > > > fanrui > > > > > > > > >
[jira] [Created] (FLINK-27803) Unable to write to s3 with Hudi format via Flink - Scala
sathyan sethumadhavan created FLINK-27803: - Summary: Unable to write to s3 with Hudi format via Flink - Scala Key: FLINK-27803 URL: https://issues.apache.org/jira/browse/FLINK-27803 Project: Flink Issue Type: Bug Components: Connectors / Hadoop Compatibility Affects Versions: 1.14.4 Environment: SBT: === libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.10.0" % "provided", "org.apache.flink" %% "flink-streaming-scala" % "1.10.0" % "provided", "com.amazonaws" % "aws-java-sdk-s3" % "1.11.563" % "provided", "org.apache.flink" %% "flink-parquet" % "1.14.4" % "provided", "org.apache.hadoop" % "hadoop-aws" % "3.2.2" % "provided", "org.apache.parquet" % "parquet-avro" % "1.10.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.8.0", "org.apache.flink" %% "flink-table-api-scala" % "1.14.4", // "org.apache.flink" % "flink-table-api-java" % "1.14.4", "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.14.4" % "provided", "org.apache.flink" %% "flink-table-planner-blink" % "1.13.5" % "provided", "org.apache.flink" %% "flink-table-planner" % "1.14.4" % "provided", "org.apache.flink" % "flink-table-common" % "1.14.4" % "provided", "org.apache.flink" %% "flink-clients" % "1.14.4" % "provided", "org.apache.hudi" % "hudi-flink-bundle_2.11" % "0.10.1" % "provided", "org.apache.hudi" % "hudi-common" % "0.11.0" % "provided", "org.apache.hudi" % "hudi-client-common" % "0.11.0" % "provided", "org.apache.hudi" % "hudi-flink-client" % "0.11.0" % "provided", "org.apache.hudi" % "hudi-sync-common" % "0.11.0" % "provided", // "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.563" % "provided", "org.apache.hadoop" % "hadoop-common" % "3.2.2" % "provided", "org.apache.hadoop" % "hadoop-client" % "3.2.2" % "provided", "org.apache.parquet" % "parquet-hadoop" % "1.12.2" % "provided", "org.apache.parquet" % "parquet-hadoop-bundle" % "1.12.2" % "provided", "org.apache.flink" % "flink-s3-fs-hadoop" % "1.14.4" % "provided" ) assemblyMergeStrategy in assembly := { casePathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } Docker file: = FROM flink:1.14.4-scala_2.12 ENV FLINK_HADOOP_CONF /hadoop/conf RUN curl https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz -o /opt/flink-1.14.4-bin-scala_2.12.tgz RUN tar -xzf /opt/flink-1.14.4-bin-scala_2.12.tgz RUN mv flink-1.14.4 /opt RUN rm -rf /opt/flink/* RUN rm -rf /opt/flink/plugins/* RUN rm -rf /opt/flink/lib/* RUN cp -R /opt/flink-1.14.4/* /opt/flink RUN ls -ltra /opt/flink RUN mv -v /opt/flink/opt/flink-python*.jar /opt/flink/lib/ RUN mkdir -p /opt/flink/plugins/s3-fs-presto /opt/flink/plugins/s3-fs-hadoop RUN mv -v /opt/flink/opt/flink-s3-fs-presto-*.jar /opt/flink/plugins/s3-fs-presto RUN mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop RUN mv -v /opt/flink/opt/flink-*.jar /opt/flink/plugins/ RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -P /opt/flink/lib/ RUN wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar -P /opt/flink/lib/ RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.563/aws-java-sdk-s3-1.11.563.jar -P /opt/flink/lib/ RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar -P /opt/flink/lib/ # RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.563/aws-java-sdk-bundle-1.11.563.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/3.2.2/hadoop-client-3.2.2.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.2.2/hadoop-common-3.2.2.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_2.12/1.14.4/flink-table-api-scala_2.12-1.14.4.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/com/google/guava/guava/31.0-jre/guava-31.0-jre.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.14.4/flink-s3-fs-hadoop-1.14.4.jar -P /opt/flink/lib/ RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.2.2/hadoop-hdfs-client-3.2.2.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.3.2/hadoop-mapreduce-client-core-3.3.2.jar -P /opt/flink/lib # For Parquet RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet_2.12/1.14.4/flink-parquet_2.12-1.14.4.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.12.2/parquet-hadoop-1.12.2.jar -P /opt/flink/lib RUN wget https://repo1.maven.org/maven2/org/apache/parquet/
Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API
Hi Lincoln, What do you think about "timeout for the asynchronous operation from the first invoke to finally complete, which may across multiple retry attempts". Best, Jark On Wed, 25 May 2022 at 20:29, Lincoln Lee wrote: > Hi Jark, > > Thanks for your feedback! > > for 2) good advice for the generic type naming, use OUT instead of T for > the async scenario can be better. > > for 3) your concern makes sense to me, we should make the change more > explicitly to users, especially the api itself (although the documentation > is necessary, it is not sufficient). And I didn't paste the complete method > signature into the FLIP. > Now review the comments of the new method again, obviously it can not > eliminate your confusion by just saying: > '@param timeout for the asynchronous operation to complete include all > reattempts.' > > The 'timeout' we want to clarify is that the user function finally reaches > the complete state, including all of the reattempts' time, and there is no > separate timeout for each attempt. > > In a worst case, if the first async request is stuck until the timeout, > then enable retry will not improve (we discussed this scenario, in the case > of such a stuck, very probability the retry still stucks, and more > importantly, there's no contract on freeing the resource for the stucked > request for the user function, so we prefer to keep the behavior as it is > now) > > Do you think it would be easier to understand if changes to: '@param > timeout for the asynchronous operation that finally complete, including all > reattempts and there is no separate timeout for each attempt.' ? > > Best, > Lincoln Lee > > > Jark Wu 于2022年5月25日周三 17:45写道: > > > Hi Lincoln, > > > > Thanks for proposing this retry feature for the async operator, this > would > > be very helpful for FLIP-234. > > It's glad to see the vivid discussion, and the following are my thoughts: > > > > 1) +1 w/o retry state. > > It's very tricky and hard to implement a semantic exact state for retry > > (currentAttemps and firstExecTime/costTime > > may not be enough). I think this might be overdesigned because most > users > > are fine with more retries when > > failover happens. Flink also doesn't provide the exact retry semantic in > > other places, e.g. "restart-strategy". > > > > 2) It confuses me what's the meaning of generic type > > of AsyncRetryStrategy and AsyncRetryPredicate. > > It would be better to add an annotation description for it. In addition, > > maybe would be better to keep > > aligned with other async interfaces (e.g. AsyncFunction). > > > > 3) timeout parameter: total timeout vs. timeout per async operation > > According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`, the > > "timeout" parameter is for > > the asynchronous operation to complete, i.e. every call of > > `AsyncFunction#asyncInvoke`. When we add a new > > `orderedWaitWithRetry` method, I think we should keep the meaning of > > "timeout" unchanged, otherwise, > > we need a different parameter name and description. > > > > Best, > > Jark > > > > On Wed, 25 May 2022 at 15:00, Lincoln Lee > wrote: > > > > > Hi everyone, > > > > > > Gen Luo, Yun Gao and I had a long offline discussion about the > > > implementation of the recovery part. The key point was should we store > > the > > > retry state and do the recovery after the job restart? > > > > > > We reached a consensus not to store the retry state for now, which is > the > > > clearest for users and does not require any new changes to the current > > > recovery behavior. We have discussed three possible options, the > > behavior > > > of these three options is identical in normal processing, the only > > > difference lies in what retry state is recorded when do checkpointing, > > and > > > what is the strategy when recovering. > > > > > > More details are updated into the FLIP[1], and the PoC[2] is also > > updated. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > [2] https://github.com/lincoln-lil/flink/tree/async-retry-poc > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Lincoln Lee 于2022年5月24日周二 12:23写道: > > > > > > > Hi Gen Luo, > > > > > > > > You're right, the total cost time include the failover-restart time. > So > > > > when the failover time exceeds the retry timeout set by the user, in > > > fact, > > > > all the data to be retry after recovery will have no additional retry > > > > opportunities, which is equivalent to normal data. In such > > circumstances, > > > > the retry state takes no effect. But not all jobs' restart is slow > and > > in > > > > flink it is becoming more and more fast due the continuously > > > improvements. > > > > Hope this can help explaining your question. > > > > > > > > Best, > > > > Lincoln Lee > > > > > > > > > > > > Gen Luo 于2022年5月24日周二 11:50写道: > > > > > > > >> Hi Lincoln, > > > >> > > > >> Thanks for the explanation. I understand your thought, but I'm a > > li
Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
After looking at the new introduced ReloadTime and Becket's comment, I agree with Becket we should have a pluggable reloading strategy. We can provide some common implementations, e.g., periodic reloading, and daily reloading. But there definitely be some connector- or business-specific reloading strategies, e.g. notify by a zookeeper watcher, reload once a new Hive partition is complete. Best, Jark On Thu, 26 May 2022 at 11:52, Becket Qin wrote: > Hi Qingsheng, > > Thanks for updating the FLIP. A few comments / questions below: > > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". > What is the difference between them? If they are the same, can we just use > XXXFactory everywhere? > > 2. Regarding the FullCachingLookupProvider, should the reloading policy > also be pluggable? Periodical reloading could be sometimes be tricky in > practice. For example, if user uses 24 hours as the cache refresh interval > and some nightly batch job delayed, the cache update may still see the > stale data. > > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be > removed. > > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little > confusing to me. If Optional getCacheFactory() returns > a non-empty factory, doesn't that already indicates the framework to cache > the missing keys? Also, why is this method returning an Optional > instead of boolean? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren wrote: > >> Hi Lincoln and Jark, >> >> Thanks for the comments! If the community reaches a consensus that we use >> SQL hint instead of table options to decide whether to use sync or async >> mode, it’s indeed not necessary to introduce the “lookup.async” option. >> >> I think it’s a good idea to let the decision of async made on query >> level, which could make better optimization with more infomation gathered >> by planner. Is there any FLIP describing the issue in FLINK-27625? I >> thought FLIP-234 is only proposing adding SQL hint for retry on missing >> instead of the entire async mode to be controlled by hint. >> >> Best regards, >> >> Qingsheng >> >> > On May 25, 2022, at 15:13, Lincoln Lee wrote: >> > >> > Hi Jark, >> > >> > Thanks for your reply! >> > >> > Currently 'lookup.async' just lies in HBase connector, I have no idea >> > whether or when to remove it (we can discuss it in another issue for the >> > HBase connector after FLINK-27625 is done), just not add it into a >> common >> > option now. >> > >> > Best, >> > Lincoln Lee >> > >> > >> > Jark Wu 于2022年5月24日周二 20:14写道: >> > >> >> Hi Lincoln, >> >> >> >> I have taken a look at FLIP-234, and I agree with you that the >> connectors >> >> can >> >> provide both async and sync runtime providers simultaneously instead >> of one >> >> of them. >> >> At that point, "lookup.async" looks redundant. If this option is >> planned to >> >> be removed >> >> in the long term, I think it makes sense not to introduce it in this >> FLIP. >> >> >> >> Best, >> >> Jark >> >> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee >> wrote: >> >> >> >>> Hi Qingsheng, >> >>> >> >>> Sorry for jumping into the discussion so late. It's a good idea that >> we >> >> can >> >>> have a common table option. I have a minor comments on 'lookup.async' >> >> that >> >>> not make it a common option: >> >>> >> >>> The table layer abstracts both sync and async lookup capabilities, >> >>> connectors implementers can choose one or both, in the case of >> >> implementing >> >>> only one capability(status of the most of existing builtin connectors) >> >>> 'lookup.async' will not be used. And when a connector has both >> >>> capabilities, I think this choice is more suitable for making >> decisions >> >> at >> >>> the query level, for example, table planner can choose the physical >> >>> implementation of async lookup or sync lookup based on its cost >> model, or >> >>> users can give query hint based on their own better understanding. If >> >>> there is another common table option 'lookup.async', it may confuse >> the >> >>> users in the long run. >> >>> >> >>> So, I prefer to leave the 'lookup.async' option in private place (for >> the >> >>> current hbase connector) and not turn it into a common option. >> >>> >> >>> WDYT? >> >>> >> >>> Best, >> >>> Lincoln Lee >> >>> >> >>> >> >>> Qingsheng Ren 于2022年5月23日周一 14:54写道: >> >>> >> Hi Alexander, >> >> Thanks for the review! We recently updated the FLIP and you can find >> >>> those >> changes from my latest email. Since some terminologies has changed so >> >>> I’ll >> use the new concept for replying your comments. >> >> 1. Builder vs ‘of’ >> I’m OK to use builder pattern if we have additional optional >> parameters >> for full caching mode (“rescan” previously). The schedule-with-delay >> >> idea >> looks reasonable to me, but I think we need to redesign the builder >> API >> >>> of >> full caching
[jira] [Created] (FLINK-27804) Do not observe cluster/job mid upgrade
Gyula Fora created FLINK-27804: -- Summary: Do not observe cluster/job mid upgrade Key: FLINK-27804 URL: https://issues.apache.org/jira/browse/FLINK-27804 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.0.0 Reporter: Gyula Fora Assignee: Gyula Fora Seems like in some weird cornercases when we observe the FINISHED job (stopped with savepoint) during an upgrade the recorded last snapshot is incorrect (still need to investigate if this is due to a Flink problem or what) This can lead to upgrade errors. This can be avoided by simply skipping the observe step when the reconciliation status is UPGRADING because at that point we actually know that the job was already shut down and state recorded correctly in the savepoint info. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [VOTE] FLIP-168: Speculative Execution for Batch Job
Hi +1(non-binding), it’s very useful for batch job stability. Best wishes fanrui On Thu, May 26, 2022 at 15:56 Zhu Zhu wrote: > Hi everyone, > > Thanks for the feedback for FLIP-168: Blocklist Mechanism [1] on the > discussion thread [2]. > > I'd like to start a vote for it. The vote will last for at least 72 hours > unless there is an objection or insufficient votes. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job > [2] https://lists.apache.org/thread/ot352tp8t7mclzx9zfv704gcm0fwrq58 >
Re: [VOTE] FLIP-168: Speculative Execution for Batch Job
+1 (non-binding) Best Regards Shqiprim On Thu, May 26, 2022 at 6:22 PM rui fan <1996fan...@gmail.com> wrote: > Hi > > +1(non-binding), it’s very useful for batch job stability. > > Best wishes > fanrui > > On Thu, May 26, 2022 at 15:56 Zhu Zhu wrote: > > > Hi everyone, > > > > Thanks for the feedback for FLIP-168: Blocklist Mechanism [1] on the > > discussion thread [2]. > > > > I'd like to start a vote for it. The vote will last for at least 72 hours > > unless there is an objection or insufficient votes. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job > > [2] https://lists.apache.org/thread/ot352tp8t7mclzx9zfv704gcm0fwrq58 > > >
Re: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2
Hi Yang, I still see the git issue when trying to build from source: [1/2] STEP 16/19: COPY *.git ./.git Error: error building at STEP "COPY *.git ./.git": checking on sources under "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't make relative to /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: ["/*.git"]: no such file or directory If I remove that COPY git line, or I first make an empty .git filesystem, then the build proceeds ok. Not sure what all we lose in the image if the underlying .git items are missing. Other testing: * The helm install looks good * Twistlock vulnerability scan of the Debian image looks good. * Basic example deployed ok Thanks, Jim
Re: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2
Thanks Jim for the testing. Could you please share the docker version you are using to build the image? It works well for "20.10.8". *COPY *.git ./.git* The above copy command should ignore the .git directory if it does not exist. Best, Yang Jim Busche 于2022年5月27日周五 02:57写道: > Hi Yang, > > > I still see the git issue when trying to build from source: > > [1/2] STEP 16/19: COPY *.git ./.git > > Error: error building at STEP "COPY *.git ./.git": checking on sources > under "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: > can't make relative to > /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: > ["/*.git"]: no such file or directory > > > > If I remove that COPY git line, or I first make an empty .git filesystem, > then the build proceeds ok. Not sure what all we lose in the image if the > underlying .git items are missing. > > > > Other testing: > > * The helm install looks good > * Twistlock vulnerability scan of the Debian image looks good. > * Basic example deployed ok > > Thanks, Jim > > >
[jira] [Created] (FLINK-27805) Bump ORC version to 1.7.2
jia liu created FLINK-27805: --- Summary: Bump ORC version to 1.7.2 Key: FLINK-27805 URL: https://issues.apache.org/jira/browse/FLINK-27805 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: jia liu The current ORC dependency version of flink is 1.5.6, but the latest ORC version 1.7.x has been released for a long time. In order to use these new features (zstd compression, column encryption etc.), we should upgrade the orc version. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27806) Datagen add binary & varbinary type support
Ran Tao created FLINK-27806: --- Summary: Datagen add binary & varbinary type support Key: FLINK-27806 URL: https://issues.apache.org/jira/browse/FLINK-27806 Project: Flink Issue Type: Improvement Affects Versions: 1.14.4, 1.15.0 Reporter: Ran Tao Datagen connector currently not support BYTES type. e.g. BINARY & VARBINARY. It will cause exception when use CREATE TABLE t3 ( f0 BIGINT, f1 VARBINARY ) WITH ( 'connector' = 'datagen', ); StackTrace: Caused by: org.apache.flink.table.api.ValidationException: Unsupported type: BYTES at org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.defaultMethod(RandomGeneratorVisitor.java:317) at org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.defaultMethod(RandomGeneratorVisitor.java:60) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:82) at org.apache.flink.table.types.logical.VarBinaryType.accept(VarBinaryType.java:151) at org.apache.flink.connector.datagen.table.DataGenTableSourceFactory.createContainer(DataGenTableSourceFactory.java:128) at org.apache.flink.connector.datagen.table.DataGenTableSourceFactory.createDynamicTableSource(DataGenTableSourceFactory.java:98) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:147) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27807) The improvement of addBatch is empty when jdbc batch submit
shmily created FLINK-27807: -- Summary: The improvement of addBatch is empty when jdbc batch submit Key: FLINK-27807 URL: https://issues.apache.org/jira/browse/FLINK-27807 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.14.4, 1.15.0 Reporter: shmily Extending the DM database dialect through JDBC, when executing Upsert semantics, a "parameter not bound" exception will be thrown, and it is found that the following code can be improved: {code:java} for (Map.Entry> entry : reduceBuffer.entrySet()) { if (entry.getValue().f0) { upsertExecutor.addToBatch(entry.getValue().f1); } else { // delete by key deleteExecutor.addToBatch(entry.getKey()); } } upsertExecutor.executeBatch(); deleteExecutor.executeBatch(); reduceBuffer.clear();{code} That is to say, when the size of reduceBuffer is 1, only one of the if-else statement blocks is executed, which will cause {code:java} upsertExecutor().executeBatch() {code} or {code:java} deleteExecutor.executeBatch(){code} to have an empty batch executed, however, this will cause some jdbc driver implementations to throw exceptions, as described above -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
Thanks Qingsheng and all for your discussion. Very sorry to jump in so late. Maybe I missed something? My first impression when I saw the cache interface was, why don't we provide an interface similar to guava cache [1], on top of guava cache, caffeine also makes extensions for asynchronous calls.[2] There is also the bulk load in caffeine too. I am also more confused why first from LookupCacheFactory.Builder and then to Factory to create Cache. [1] https://github.com/google/guava [2] https://github.com/ben-manes/caffeine/wiki/Population Best, Jingsong On Thu, May 26, 2022 at 11:17 PM Jark Wu wrote: > After looking at the new introduced ReloadTime and Becket's comment, > I agree with Becket we should have a pluggable reloading strategy. > We can provide some common implementations, e.g., periodic reloading, and > daily reloading. > But there definitely be some connector- or business-specific reloading > strategies, e.g. > notify by a zookeeper watcher, reload once a new Hive partition is > complete. > > Best, > Jark > > On Thu, 26 May 2022 at 11:52, Becket Qin wrote: > > > Hi Qingsheng, > > > > Thanks for updating the FLIP. A few comments / questions below: > > > > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". > > What is the difference between them? If they are the same, can we just > use > > XXXFactory everywhere? > > > > 2. Regarding the FullCachingLookupProvider, should the reloading policy > > also be pluggable? Periodical reloading could be sometimes be tricky in > > practice. For example, if user uses 24 hours as the cache refresh > interval > > and some nightly batch job delayed, the cache update may still see the > > stale data. > > > > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be > > removed. > > > > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little > > confusing to me. If Optional getCacheFactory() > returns > > a non-empty factory, doesn't that already indicates the framework to > cache > > the missing keys? Also, why is this method returning an Optional > > instead of boolean? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren > wrote: > > > >> Hi Lincoln and Jark, > >> > >> Thanks for the comments! If the community reaches a consensus that we > use > >> SQL hint instead of table options to decide whether to use sync or async > >> mode, it’s indeed not necessary to introduce the “lookup.async” option. > >> > >> I think it’s a good idea to let the decision of async made on query > >> level, which could make better optimization with more infomation > gathered > >> by planner. Is there any FLIP describing the issue in FLINK-27625? I > >> thought FLIP-234 is only proposing adding SQL hint for retry on missing > >> instead of the entire async mode to be controlled by hint. > >> > >> Best regards, > >> > >> Qingsheng > >> > >> > On May 25, 2022, at 15:13, Lincoln Lee > wrote: > >> > > >> > Hi Jark, > >> > > >> > Thanks for your reply! > >> > > >> > Currently 'lookup.async' just lies in HBase connector, I have no idea > >> > whether or when to remove it (we can discuss it in another issue for > the > >> > HBase connector after FLINK-27625 is done), just not add it into a > >> common > >> > option now. > >> > > >> > Best, > >> > Lincoln Lee > >> > > >> > > >> > Jark Wu 于2022年5月24日周二 20:14写道: > >> > > >> >> Hi Lincoln, > >> >> > >> >> I have taken a look at FLIP-234, and I agree with you that the > >> connectors > >> >> can > >> >> provide both async and sync runtime providers simultaneously instead > >> of one > >> >> of them. > >> >> At that point, "lookup.async" looks redundant. If this option is > >> planned to > >> >> be removed > >> >> in the long term, I think it makes sense not to introduce it in this > >> FLIP. > >> >> > >> >> Best, > >> >> Jark > >> >> > >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee > >> wrote: > >> >> > >> >>> Hi Qingsheng, > >> >>> > >> >>> Sorry for jumping into the discussion so late. It's a good idea that > >> we > >> >> can > >> >>> have a common table option. I have a minor comments on > 'lookup.async' > >> >> that > >> >>> not make it a common option: > >> >>> > >> >>> The table layer abstracts both sync and async lookup capabilities, > >> >>> connectors implementers can choose one or both, in the case of > >> >> implementing > >> >>> only one capability(status of the most of existing builtin > connectors) > >> >>> 'lookup.async' will not be used. And when a connector has both > >> >>> capabilities, I think this choice is more suitable for making > >> decisions > >> >> at > >> >>> the query level, for example, table planner can choose the physical > >> >>> implementation of async lookup or sync lookup based on its cost > >> model, or > >> >>> users can give query hint based on their own better understanding. > If > >> >>> there is another common table option 'lookup.async', it may confuse > >> the > >> >>> users in the l
Re: [VOTE] FLIP-168: Speculative Execution for Batch Job
+1 (binding) Best, Guowei On Fri, May 27, 2022 at 12:41 AM Shqiprim Bunjaku wrote: > +1 (non-binding) > > Best Regards > Shqiprim > > On Thu, May 26, 2022 at 6:22 PM rui fan <1996fan...@gmail.com> wrote: > > > Hi > > > > +1(non-binding), it’s very useful for batch job stability. > > > > Best wishes > > fanrui > > > > On Thu, May 26, 2022 at 15:56 Zhu Zhu wrote: > > > > > Hi everyone, > > > > > > Thanks for the feedback for FLIP-168: Blocklist Mechanism [1] on the > > > discussion thread [2]. > > > > > > I'd like to start a vote for it. The vote will last for at least 72 > hours > > > unless there is an objection or insufficient votes. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job > > > [2] https://lists.apache.org/thread/ot352tp8t7mclzx9zfv704gcm0fwrq58 > > > > > >
Re: [VOTE] FLIP-235: Hybrid Shuffle Mode
+1 Best, Yangze Guo On Thu, May 26, 2022 at 4:42 PM Xintong Song wrote: > > +1 > > Best, > > Xintong > > > > On Thu, May 26, 2022 at 3:47 PM weijie guo > wrote: > > > Hi everyone, > > > > Thanks for the feedback for FLIP-235: Hybrid Shuffle Mode[1] on the > > discussion thread [2] > > > > I'd like to start a vote for it. The vote will last for at least 72 hours > > unless there is an objection or insufficient votes. > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > > [2] https://lists.apache.org/thread/hfwpcs54sm5gp3mhv7s3lr79jywo3kv4 > > > > Best regards, > > > > Weijie > >
Re: [VOTE] FLIP-224: Blocklist Mechanism
+1 (binding) Best, Yangze Guo On Thu, May 26, 2022 at 3:54 PM Yun Gao wrote: > > Thanks Lijie and Zhu for driving the FLIP! > > The blocked list functionality helps reduce the complexity in maintenance > and the currently design looks good to me, thus +1 from my side (binding). > > > Best, > Yun > > > > > -- > From:Xintong Song > Send Time:2022 May 26 (Thu.) 12:51 > To:dev > Subject:Re: [VOTE] FLIP-224: Blocklist Mechanism > > Thanks for driving this effort, Lijie. > > I think a nice addition would be to make this feature accessible directly > from webui. However, there's no reason to block this FLIP on it. > > So +1 (binding) from my side. > > Best, > > Xintong > > > > On Fri, May 20, 2022 at 12:57 PM Lijie Wang > wrote: > > > Hi everyone, > > > > Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the > > discussion thread [2] > > > > I'd like to start a vote for it. The vote will last for at least 72 hours > > unless there is an objection or insufficient votes. > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism > > [2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h > > > > Best, > > Lijie > > >
Re: [DISCUSS] FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems
Thanks Lincoln for your proposal. Take a look at `strategy: fixed-delay delay: duration, e.g., 10s max-attempts: integer, e.g., 3`. Are these options only for async? It looks like normal lookups work too? One thing is: most of the lookup functions seem to be synchronous now? There are not so many asynchronous ones? Best, Jingsong On Tue, May 24, 2022 at 11:48 AM Lincoln Lee wrote: > Hi all, > > Considering the new common table option 'lookup.max-retries' proposed in > FLIP-221[1] which is commonly used for exception handling in connector > implementation, we should clearly distinguish ASYNC_LOOKUP_RETRY from it to > avoid confusing users. > > To do so, the name ASYNC_LOOKUP_RETRY can change to > ASYNC_LOOKUP_MISS_RETRY, and as the name implies, restrict it to support > retries only for lookup misses and no longer include exceptions (for sql > connectors, let the connector implementer decide how to handle exceptions > since there are various kinds of retryable exceptions and can not retry > ones). > > The FLIP[2] has been updated. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > > Best, > Lincoln Lee > > > Lincoln Lee 于2022年5月19日周四 18:24写道: > > > Dear Flink developers, > > > > I would like to open a discussion on FLIP 234 [1] to support retryable > > lookup join to solve delayed updates issue, as a pre-work for this > > solution, we proposed FLIP-232[2] which adds a generic retry support for > > Async I/O. > > We prefer to offer this retry capability via query hints, similar to new > > join hints proposed in FLINK-27625[3] & FLIP-204[4]. > > > > This feature is backwards compatible and transparently to connectors. For > > existing connectors which implements AsyncTableFunction, can easily > enable > > async retry via the new join hint. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > [2] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > [3] https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8 > > [4] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204:+Introduce+Hash+Lookup+Join > > > > Best, > > Lincoln Lee > > >
Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API
Hi Jark & developers, I'm fine with this, and minor changes: "timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover" The FLIP[1] was updated including two changes: 1. generic type naming, use OUT instead of T 2. the new api's comments *And if no more new feedback, we will start a VOTE next monday.* [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 Best, Lincoln Lee Jark Wu 于2022年5月26日周四 23:10写道: > Hi Lincoln, > > What do you think about > "timeout for the asynchronous operation from the first invoke to finally > complete, which may across multiple retry attempts". > > Best, > Jark > > On Wed, 25 May 2022 at 20:29, Lincoln Lee wrote: > > > Hi Jark, > > > > Thanks for your feedback! > > > > for 2) good advice for the generic type naming, use OUT instead of T for > > the async scenario can be better. > > > > for 3) your concern makes sense to me, we should make the change more > > explicitly to users, especially the api itself (although the > documentation > > is necessary, it is not sufficient). And I didn't paste the complete > method > > signature into the FLIP. > > Now review the comments of the new method again, obviously it can not > > eliminate your confusion by just saying: > > '@param timeout for the asynchronous operation to complete include all > > reattempts.' > > > > The 'timeout' we want to clarify is that the user function finally > reaches > > the complete state, including all of the reattempts' time, and there is > no > > separate timeout for each attempt. > > > > In a worst case, if the first async request is stuck until the timeout, > > then enable retry will not improve (we discussed this scenario, in the > case > > of such a stuck, very probability the retry still stucks, and more > > importantly, there's no contract on freeing the resource for the stucked > > request for the user function, so we prefer to keep the behavior as it is > > now) > > > > Do you think it would be easier to understand if changes to: '@param > > timeout for the asynchronous operation that finally complete, including > all > > reattempts and there is no separate timeout for each attempt.' ? > > > > Best, > > Lincoln Lee > > > > > > Jark Wu 于2022年5月25日周三 17:45写道: > > > > > Hi Lincoln, > > > > > > Thanks for proposing this retry feature for the async operator, this > > would > > > be very helpful for FLIP-234. > > > It's glad to see the vivid discussion, and the following are my > thoughts: > > > > > > 1) +1 w/o retry state. > > > It's very tricky and hard to implement a semantic exact state for retry > > > (currentAttemps and firstExecTime/costTime > > > may not be enough). I think this might be overdesigned because most > > users > > > are fine with more retries when > > > failover happens. Flink also doesn't provide the exact retry semantic > in > > > other places, e.g. "restart-strategy". > > > > > > 2) It confuses me what's the meaning of generic type > > > of AsyncRetryStrategy and AsyncRetryPredicate. > > > It would be better to add an annotation description for it. In > addition, > > > maybe would be better to keep > > > aligned with other async interfaces (e.g. AsyncFunction). > > > > > > 3) timeout parameter: total timeout vs. timeout per async operation > > > According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`, > the > > > "timeout" parameter is for > > > the asynchronous operation to complete, i.e. every call of > > > `AsyncFunction#asyncInvoke`. When we add a new > > > `orderedWaitWithRetry` method, I think we should keep the meaning of > > > "timeout" unchanged, otherwise, > > > we need a different parameter name and description. > > > > > > Best, > > > Jark > > > > > > On Wed, 25 May 2022 at 15:00, Lincoln Lee > > wrote: > > > > > > > Hi everyone, > > > > > > > > Gen Luo, Yun Gao and I had a long offline discussion about the > > > > implementation of the recovery part. The key point was should we > store > > > the > > > > retry state and do the recovery after the job restart? > > > > > > > > We reached a consensus not to store the retry state for now, which is > > the > > > > clearest for users and does not require any new changes to the > current > > > > recovery behavior. We have discussed three possible options, the > > > behavior > > > > of these three options is identical in normal processing, the only > > > > difference lies in what retry state is recorded when do > checkpointing, > > > and > > > > what is the strategy when recovering. > > > > > > > > More details are updated into the FLIP[1], and the PoC[2] is also > > > updated. > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > [2] https://github.com/lincoln-lil/flink/tree/async-retry-poc > > > > > > > > Best, > > > > Lincoln Lee > > > > > > > > > > > > Lincoln Lee 于2022年5月24日周二 12:23写道: > > > > > > > > > Hi Gen Luo, > >
Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API
Thanks, Lincoln, the updated comments look good to me. Best, Jark On Fri, 27 May 2022 at 14:21, Lincoln Lee wrote: > Hi Jark & developers, > > I'm fine with this, and minor changes: > > "timeout from first invoke to final completion of asynchronous operation, > may include multiple retries, and will be reset in case of failover" > > The FLIP[1] was updated including two changes: > 1. generic type naming, use OUT instead of T > 2. the new api's comments > > *And if no more new feedback, we will start a VOTE next monday.* > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > Best, > Lincoln Lee > > > Jark Wu 于2022年5月26日周四 23:10写道: > > > Hi Lincoln, > > > > What do you think about > > "timeout for the asynchronous operation from the first invoke to finally > > complete, which may across multiple retry attempts". > > > > Best, > > Jark > > > > On Wed, 25 May 2022 at 20:29, Lincoln Lee > wrote: > > > > > Hi Jark, > > > > > > Thanks for your feedback! > > > > > > for 2) good advice for the generic type naming, use OUT instead of T > for > > > the async scenario can be better. > > > > > > for 3) your concern makes sense to me, we should make the change more > > > explicitly to users, especially the api itself (although the > > documentation > > > is necessary, it is not sufficient). And I didn't paste the complete > > method > > > signature into the FLIP. > > > Now review the comments of the new method again, obviously it can not > > > eliminate your confusion by just saying: > > > '@param timeout for the asynchronous operation to complete include all > > > reattempts.' > > > > > > The 'timeout' we want to clarify is that the user function finally > > reaches > > > the complete state, including all of the reattempts' time, and there is > > no > > > separate timeout for each attempt. > > > > > > In a worst case, if the first async request is stuck until the timeout, > > > then enable retry will not improve (we discussed this scenario, in the > > case > > > of such a stuck, very probability the retry still stucks, and more > > > importantly, there's no contract on freeing the resource for the > stucked > > > request for the user function, so we prefer to keep the behavior as it > is > > > now) > > > > > > Do you think it would be easier to understand if changes to: '@param > > > timeout for the asynchronous operation that finally complete, including > > all > > > reattempts and there is no separate timeout for each attempt.' ? > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Jark Wu 于2022年5月25日周三 17:45写道: > > > > > > > Hi Lincoln, > > > > > > > > Thanks for proposing this retry feature for the async operator, this > > > would > > > > be very helpful for FLIP-234. > > > > It's glad to see the vivid discussion, and the following are my > > thoughts: > > > > > > > > 1) +1 w/o retry state. > > > > It's very tricky and hard to implement a semantic exact state for > retry > > > > (currentAttemps and firstExecTime/costTime > > > > may not be enough). I think this might be overdesigned because most > > > users > > > > are fine with more retries when > > > > failover happens. Flink also doesn't provide the exact retry > semantic > > in > > > > other places, e.g. "restart-strategy". > > > > > > > > 2) It confuses me what's the meaning of generic type > > > > of AsyncRetryStrategy and AsyncRetryPredicate. > > > > It would be better to add an annotation description for it. In > > addition, > > > > maybe would be better to keep > > > > aligned with other async interfaces (e.g. AsyncFunction). > > > > > > > > 3) timeout parameter: total timeout vs. timeout per async operation > > > > According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`, > > the > > > > "timeout" parameter is for > > > > the asynchronous operation to complete, i.e. every call of > > > > `AsyncFunction#asyncInvoke`. When we add a new > > > > `orderedWaitWithRetry` method, I think we should keep the meaning of > > > > "timeout" unchanged, otherwise, > > > > we need a different parameter name and description. > > > > > > > > Best, > > > > Jark > > > > > > > > On Wed, 25 May 2022 at 15:00, Lincoln Lee > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > Gen Luo, Yun Gao and I had a long offline discussion about the > > > > > implementation of the recovery part. The key point was should we > > store > > > > the > > > > > retry state and do the recovery after the job restart? > > > > > > > > > > We reached a consensus not to store the retry state for now, which > is > > > the > > > > > clearest for users and does not require any new changes to the > > current > > > > > recovery behavior. We have discussed three possible options, the > > > > behavior > > > > > of these three options is identical in normal processing, the only > > > > > difference lies in what retry state is recorded when do > > checkpointing, > > > > and > > > > > what is the strategy when recovering. > > > > > > > > > > Mo
Re: [VOTE] FLIP-224: Blocklist Mechanism
-1 to put a lid on things for now, because I'm not quite done yet with the discussion. On 27/05/2022 05:25, Yangze Guo wrote: +1 (binding) Best, Yangze Guo On Thu, May 26, 2022 at 3:54 PM Yun Gao wrote: Thanks Lijie and Zhu for driving the FLIP! The blocked list functionality helps reduce the complexity in maintenance and the currently design looks good to me, thus +1 from my side (binding). Best, Yun -- From:Xintong Song Send Time:2022 May 26 (Thu.) 12:51 To:dev Subject:Re: [VOTE] FLIP-224: Blocklist Mechanism Thanks for driving this effort, Lijie. I think a nice addition would be to make this feature accessible directly from webui. However, there's no reason to block this FLIP on it. So +1 (binding) from my side. Best, Xintong On Fri, May 20, 2022 at 12:57 PM Lijie Wang wrote: Hi everyone, Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the discussion thread [2] I'd like to start a vote for it. The vote will last for at least 72 hours unless there is an objection or insufficient votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism [2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h Best, Lijie