[jira] [Created] (FLINK-10288) Failover Strategy improvement
JIN SUN created FLINK-10288: --- Summary: Failover Strategy improvement Key: FLINK-10288 URL: https://issues.apache.org/jira/browse/FLINK-10288 Project: Flink Issue Type: Improvement Components: JobManager Reporter: JIN SUN Assignee: JIN SUN Flink pays significant efforts to make Streaming Job fault tolerant. The checkpoint mechanism and exactly once semantics make Flink different than other systems. However, there are still some cases not been handled very well. Those cases can apply to both Streaming and Batch scenarios, and its orthogonal with current fault tolerant mechanism. Here is a summary of those cases: # Some failures are non-recoverable, such as a user error: DividebyZeroException. We shouldn't try to restart the task, as it will never succeed. The DivideByZeroException is just a simple case, those errors sometime are not easy to reproduce or predict, as it might be only triggered by specific input data, we shouldn’t retry for all user code exceptions. # There is no limit for task retry today, unless a SuppressRestartException was encountered, a task will keep on retrying until it succeeds. As mentioned above, we shouldn’t retry for some cases at all, and for the Exceptions we can retry, such as a network exception, should we have a retry limit? We need retry for any transient issue, but we also need to set a limit to avoid infinite retry and resource wasting. For Batch and Streaming workload, we might need different strategies. # There are some exceptions due to hardware issues, such as disk/network malfunction. when a task/TaskManager fail on this, we’d better detect and avoid to schedule to that machine next time. # If a task read from a blocking result partition, when its input is not available, we can ‘revoke’ the produce task, set the task fail and rerun the upstream task to regenerate data. the revoke can propagate up through the chain. In Spark, revoke is naturally support by lineage. To make fault tolerance easier, we need to keep deterministic behavior as much as possible. For user code, it’s not easy to control. However, for system related code, we can fix it. For example, we should at least make sure the different attempt of a same task to have the same inputs (we have a bug in current codebase (DataSourceTask) that cannot guarantee this). Note that this is track by [Flink-10205] Details see this proposal: [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
JIN SUN created FLINK-10289: --- Summary: Classify Exceptions to different category for apply different failover strategy Key: FLINK-10289 URL: https://issues.apache.org/jira/browse/FLINK-10289 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: JIN SUN Assignee: JIN SUN We need to classify exceptions and treat them with different strategies. To do this, we propose to introduce the following Throwable Types, and the corresponding exceptions: * NonRecoverable * We shouldn’t retry if an exception was classified as NonRecoverable * For example, NoResouceAvailiableException is a NonRecoverable Exception * Introduce a new Exception UserCodeException to wrap all exceptions that throw from user code * PartitionDataMissingError * In certain scenarios producer data was transferred in blocking mode or data was saved in persistent store. If the partition was missing, we need to revoke/rerun the produce task to regenerate the data. * Introduce a new exception PartitionDataMissingException to wrap all those kinds of issues. * EnvironmentError * It happened due to hardware, or software issues that were related to specific environments. The assumption is that a task will succeed if we run it in a different environment, and other task run in this bad environment will very likely fail. If multiple task failures in the same machine due to EnvironmentError, we need to consider adding the bad machine to blacklist, and avoiding schedule task on it. * Introduce a new exception EnvironmentException to wrap all those kind of issues. * Recoverable * We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan
wangsan created FLINK-10290: --- Summary: Conversion error in StreamScan and BatchScan Key: FLINK-10290 URL: https://issues.apache.org/jira/browse/FLINK-10290 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0, 1.5.3 Reporter: wangsan `RowTypeInfo#equals()` only compares field types, and fields names are not considered. When checking the equality of `inputType` and `internalType`, we should compare both filed types and field names. Behavior of this bug: A table T with schema (a: Long, b:Long, c:Long) SELECT b,c,a from T expected: b,c,a actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
Till Rohrmann created FLINK-10291: - Summary: Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint Key: FLINK-10291 URL: https://issues.apache.org/jira/browse/FLINK-10291 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.6.0, 1.7.0 Reporter: Till Rohrmann Fix For: 1.6.1, 1.7.0 The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} from the user code when being started. Due to the nature of how the {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is problematic in case of a failover because then, the {{JobMaster}} won't be able to detect the checkpoints. In order to solve this problem, we need to either fix the {{JobID}} assignment or make it configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
Till Rohrmann created FLINK-10292: - Summary: Generate JobGraph in StandaloneJobClusterEntrypoint only once Key: FLINK-10292 URL: https://issues.apache.org/jira/browse/FLINK-10292 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.6.0, 1.7.0 Reporter: Till Rohrmann Fix For: 1.6.1, 1.7.0 Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} from the given user code every time it starts/is restarted. This can be problematic if the the {{JobGraph}} generation has side effects. Therefore, it would be better to generate the {{JobGraph}} only once and store it in HA storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10293) RemoteStreamEnvironment does not forward port to RestClusterClient
Chesnay Schepler created FLINK-10293: Summary: RemoteStreamEnvironment does not forward port to RestClusterClient Key: FLINK-10293 URL: https://issues.apache.org/jira/browse/FLINK-10293 Project: Flink Issue Type: Bug Components: Client, Streaming Affects Versions: 1.6.0, 1.5.1, 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.6.1, 1.7.0, 1.5.4 A user reported on the ML that the port given to the RemoteStreamEnvironment is not forwarded to the RestClusterClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10294) Split MiniClusterResource to be usable in runtime/streaming-java/clients
Chesnay Schepler created FLINK-10294: Summary: Split MiniClusterResource to be usable in runtime/streaming-java/clients Key: FLINK-10294 URL: https://issues.apache.org/jira/browse/FLINK-10294 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.7.0 h5. Problem The {{MiniClusterResource}} is a utility class to create and manage flink clusters for testing purposes. It is incredibly convenient, but unfortunately resides in {{flink-test-utils}} which depends on flink-runtime, flink-streaming-java and flink-clients, making the class not usable in these modules. The current version does require these dependencies, but only for specific, *optional*, parts. {{streaming-java}} is only required for accessing {{TestStreamEnvironment}} and {{flink-clients}} only for tests that want to work against a {{ClusterClient}}. h5. Proposal Split the {{MiniClusterResource}} as follows: h5. 1) Remove client/streaming-java dependent parts and move the class to flink-runtime. h5. 2) Add a new class {{StreamingMiniClusterResourceExtension}} that accepts a {{MiniClusterResource}} as an argument and contains the streaming parts. Usage would look like this: {code} private final MiniClusterResource cluster = ... private final StreamingMiniClusterResourceExtension ext = new StreamingMiniClusterResourceExtension(cluster); @Rule public RuleChain chain= RuleChain .outerRule(cluster) .around(ext), {code} h5. 3) Add a new class {{ClientMiniClusterResourceExtension}} that accepts a {{MiniClusterResource}} as an argument and contains the client parts. Usage would look like this: {code} private final MiniClusterResource cluster = ... private final ClientMiniClusterResourceExtensionext = new ClientMiniClusterResourceExtension(cluster); @Rule public RuleChain chain= RuleChain .outerRule(cluster) .around(ext), {code} [~till.rohrmann] WDYT? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
Gaurav Singhania created FLINK-10295: Summary: Tokenisation of Program Args resulting in unexpected results Key: FLINK-10295 URL: https://issues.apache.org/jira/browse/FLINK-10295 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.6.0, 1.5.0 Reporter: Gaurav Singhania Attachments: sample_request.txt We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes all the details to run the job as program args against a jarid, including sql query and kafka details. In version 1.5 the program args are tokenised as a result single quote (') and double quote(") are stripped from the arguments. This results in malformed args. Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10296) TypeExtractor only validates/matches exact POJO classes
James Morgan created FLINK-10296: Summary: TypeExtractor only validates/matches exact POJO classes Key: FLINK-10296 URL: https://issues.apache.org/jira/browse/FLINK-10296 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.0, 1.5.1 Reporter: James Morgan Attachments: PojoTest.java I have a workflow that processes a stream of objects that implement an interface. The primary implementation was not a POJO and worked fine. When I added an implementation that was a POJO, the workflow fails with an error of Input mismatch; namely our POJO does not match the interface. The exception is thrown in TypeExtractor#validateInfo when the type it checks is an instanceof PojoTypeInfo (line 1456). When the object is _not_ a POJO, it is GenericTypeInfo (line 1476) and passes this validation. The difference between these two handling blocks is the POJO test is testing that the typeInfo's type class is _the same as_ the class of the type desired by the next step in the workflow. The Generic block tests that the class of the type _is assignable from_ the typeInfo's type class. Attached is a JUnit5 test that illustrates the issue. The testPojo() test will fail as written and indicates the mismatch of FooPojo and Foo. I believe that changing the block for the PojoTypeInfo to act like the GenericTypeInfo block would fix this but I don't know if there is a specific reason to treat POJOs differently here. (For example, if the Foo array in the test is changed to FooPojo[], then there is a compile time argument mismatch: TestMapFunction cannot be converted to MapFunction.) Workarounds: 1. Avoid POJOs when using interfaces as part of the steps of the workflow. 2. Modify the map function to be a generic class with T extends Foo: TestMapFunction implements MapFunction -- This message was sent by Atlassian JIRA (v7.6.3#76005)