[jira] [Created] (FLINK-10288) Failover Strategy improvement

2018-09-06 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10288:
---

 Summary: Failover Strategy improvement
 Key: FLINK-10288
 URL: https://issues.apache.org/jira/browse/FLINK-10288
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: JIN SUN
Assignee: JIN SUN


Flink pays significant efforts to make Streaming Job fault tolerant. The 
checkpoint mechanism and exactly once semantics make Flink different than other 
systems. However, there are still some cases not been handled very well. Those 
cases can apply to both Streaming and Batch scenarios, and its orthogonal with 
current fault tolerant mechanism. Here is a summary of those cases:
 # Some failures are non-recoverable, such as a user error: 
DividebyZeroException. We shouldn't try to restart the task, as it will never 
succeed. The DivideByZeroException is just a simple case, those errors sometime 
are not easy to reproduce or predict, as it might be only triggered by specific 
input data, we shouldn’t retry for all user code exceptions.
 # There is no limit for task retry today, unless a SuppressRestartException 
was encountered, a task will keep on retrying until it succeeds. As mentioned 
above, we shouldn’t retry for some cases at all, and for the Exceptions we can 
retry, such as a network exception, should we have a retry limit? We need retry 
for any transient issue, but we also need to set a limit to avoid infinite 
retry and resource wasting. For Batch and Streaming workload, we might need 
different strategies.
 # There are some exceptions due to hardware issues, such as disk/network 
malfunction. when a task/TaskManager fail on this, we’d better detect and avoid 
to schedule to that machine next time.
 # If a task read from a blocking result partition, when its input is not 
available, we can ‘revoke’ the produce task, set the task fail and rerun the 
upstream task to regenerate data.  the revoke can propagate up through the 
chain. In Spark, revoke is naturally support by lineage.

To make fault tolerance easier, we need to keep deterministic behavior as much 
as possible. For user code, it’s not easy to control. However, for system 
related code, we can fix it. For example, we should at least make sure the 
different attempt of a same task to have the same inputs (we have a bug in 
current codebase (DataSourceTask) that cannot guarantee this). Note that this 
is track by [Flink-10205]

Details see this proposal:

[https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-06 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10289:
---

 Summary: Classify Exceptions to different category for apply 
different failover strategy
 Key: FLINK-10289
 URL: https://issues.apache.org/jira/browse/FLINK-10289
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: JIN SUN
Assignee: JIN SUN


We need to classify exceptions and treat them with different strategies. To do 
this, we propose to introduce the following Throwable Types, and the 
corresponding exceptions:
 * NonRecoverable
 * We shouldn’t retry if an exception was classified as NonRecoverable
 * For example, NoResouceAvailiableException is a NonRecoverable Exception
 * Introduce a new Exception UserCodeException to wrap all exceptions that 
throw from user code


 *  PartitionDataMissingError
 * In certain scenarios producer data was transferred in blocking mode or data 
was saved in persistent store. If the partition was missing, we need to 
revoke/rerun the produce task to regenerate the data.
 * Introduce a new exception PartitionDataMissingException to wrap all those 
kinds of issues.


 * EnvironmentError
 * It happened due to hardware, or software issues that were related to 
specific environments. The assumption is that a task will succeed if we run it 
in a different environment, and other task run in this bad environment will 
very likely fail. If multiple task failures in the same machine due to 
EnvironmentError, we need to consider adding the bad machine to blacklist, and 
avoiding schedule task on it.
 * Introduce a new exception EnvironmentException to wrap all those kind of 
issues.


 * Recoverable
 * We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-06 Thread wangsan (JIRA)
wangsan created FLINK-10290:
---

 Summary: Conversion error in StreamScan and BatchScan
 Key: FLINK-10290
 URL: https://issues.apache.org/jira/browse/FLINK-10290
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.0, 1.5.3
Reporter: wangsan


`RowTypeInfo#equals()` only compares field types, and fields names are not 
considered. When checking the equality of `inputType` and `internalType`, we 
should compare both filed types and field names.

Behavior of this bug:

A table T with schema (a: Long, b:Long, c:Long)
SELECT b,c,a from T
expected: b,c,a
actually: a,b,c





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-09-06 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10291:
-

 Summary: Generate JobGraph with fixed/configurable JobID in 
StandaloneJobClusterEntrypoint
 Key: FLINK-10291
 URL: https://issues.apache.org/jira/browse/FLINK-10291
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0


The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} 
from the user code when being started. Due to the nature of how the 
{{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is 
problematic in case of a failover because then, the {{JobMaster}} won't be able 
to detect the checkpoints. In order to solve this problem, we need to either 
fix the {{JobID}} assignment or make it configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-06 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10292:
-

 Summary: Generate JobGraph in StandaloneJobClusterEntrypoint only 
once
 Key: FLINK-10292
 URL: https://issues.apache.org/jira/browse/FLINK-10292
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0


Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
from the given user code every time it starts/is restarted. This can be 
problematic if the the {{JobGraph}} generation has side effects. Therefore, it 
would be better to generate the {{JobGraph}} only once and store it in HA 
storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10293) RemoteStreamEnvironment does not forward port to RestClusterClient

2018-09-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10293:


 Summary: RemoteStreamEnvironment does not forward port to 
RestClusterClient
 Key: FLINK-10293
 URL: https://issues.apache.org/jira/browse/FLINK-10293
 Project: Flink
  Issue Type: Bug
  Components: Client, Streaming
Affects Versions: 1.6.0, 1.5.1, 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.1, 1.7.0, 1.5.4


A user reported on the ML that the port given to the RemoteStreamEnvironment is 
not forwarded to the RestClusterClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10294) Split MiniClusterResource to be usable in runtime/streaming-java/clients

2018-09-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10294:


 Summary: Split MiniClusterResource to be usable in 
runtime/streaming-java/clients
 Key: FLINK-10294
 URL: https://issues.apache.org/jira/browse/FLINK-10294
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


h5. Problem

The {{MiniClusterResource}} is a utility class to create and manage flink 
clusters for testing purposes. It is incredibly convenient, but unfortunately 
resides in {{flink-test-utils}} which depends on flink-runtime, 
flink-streaming-java and flink-clients, making the class not usable in these 
modules.

The current version does require these dependencies, but only for specific, 
*optional*, parts. {{streaming-java}} is only required for accessing 
{{TestStreamEnvironment}} and {{flink-clients}} only for tests that want to 
work against a {{ClusterClient}}.

h5. Proposal

Split the {{MiniClusterResource}} as follows:

h5. 1)
Remove client/streaming-java dependent parts and move the class to 
flink-runtime.

h5. 2)
Add a new class {{StreamingMiniClusterResourceExtension}} that accepts a  
{{MiniClusterResource}} as an argument and contains the streaming parts.
Usage would look like this:
{code}
private final MiniClusterResource cluster = ...
private final StreamingMiniClusterResourceExtension ext = new 
StreamingMiniClusterResourceExtension(cluster);

@Rule
public RuleChain chain= RuleChain
.outerRule(cluster) 
.around(ext),
{code}

h5. 3)

Add a new class {{ClientMiniClusterResourceExtension}} that accepts a  
{{MiniClusterResource}} as an argument and contains the client parts.
Usage would look like this:
{code}
private final MiniClusterResource cluster = ...
private final ClientMiniClusterResourceExtensionext = new 
ClientMiniClusterResourceExtension(cluster);

@Rule
public RuleChain chain= RuleChain
.outerRule(cluster) 
.around(ext),
{code}

[~till.rohrmann] WDYT?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-09-06 Thread Gaurav Singhania (JIRA)
Gaurav Singhania created FLINK-10295:


 Summary: Tokenisation of Program Args resulting in unexpected 
results
 Key: FLINK-10295
 URL: https://issues.apache.org/jira/browse/FLINK-10295
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.6.0, 1.5.0
Reporter: Gaurav Singhania
 Attachments: sample_request.txt

We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
all the details to run the job as program args against a jarid, including sql 
query and kafka details. In version 1.5 the program args are tokenised as a 
result single quote (') and double quote(") are stripped from the arguments. 
This results in malformed args.

Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10296) TypeExtractor only validates/matches exact POJO classes

2018-09-06 Thread James Morgan (JIRA)
James Morgan created FLINK-10296:


 Summary: TypeExtractor only validates/matches exact POJO classes
 Key: FLINK-10296
 URL: https://issues.apache.org/jira/browse/FLINK-10296
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.6.0, 1.5.1
Reporter: James Morgan
 Attachments: PojoTest.java

I have a workflow that processes a stream of objects that implement an
 interface.  The primary implementation was not a POJO and worked fine.
 When I added an implementation that was a POJO, the workflow fails with
 an error of Input mismatch; namely our POJO does not match the interface.

The exception is thrown in TypeExtractor#validateInfo when the type it
 checks is an instanceof PojoTypeInfo (line 1456).  When the object is
 _not_ a POJO, it is GenericTypeInfo (line 1476) and passes this validation.
 The difference between these two handling blocks is the POJO test is
 testing that the typeInfo's type class is _the same as_ the class of the
 type desired by the next step in the workflow.  The Generic block tests
 that the class of the type _is assignable from_ the typeInfo's type class.

Attached is a JUnit5 test that illustrates the issue.  The testPojo()
 test will fail as written and indicates the mismatch of FooPojo and Foo.

I believe that changing the block for the PojoTypeInfo to act like the
 GenericTypeInfo block would fix this but I don't know if there is a
 specific reason to treat POJOs differently here.  (For example, if the
 Foo array in the test is changed to FooPojo[], then there is a compile
 time argument mismatch: TestMapFunction cannot be converted to
 MapFunction.)

Workarounds:
 1. Avoid POJOs when using interfaces as part of the steps of the workflow.

2. Modify the map function to be a generic class with T extends Foo:
    TestMapFunction implements MapFunction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)