[jira] [Created] (FLINK-4406) Implement job master registration at resource manager

2016-08-16 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4406:
--

 Summary: Implement job master registration at resource manager
 Key: FLINK-4406
 URL: https://issues.apache.org/jira/browse/FLINK-4406
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Wenlong Lyu


Job Master needs to register to Resource Manager when starting and then watches 
leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4414) Remove restriction on RpcService.getAddress

2016-08-17 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4414:
--

 Summary: Remove restriction on RpcService.getAddress
 Key: FLINK-4414
 URL: https://issues.apache.org/jira/browse/FLINK-4414
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


currently {{RpcService}} provide only address of the endpoint, I think rpc 
service serve both the endpoint create on it and the remote gateway create on 
it, so it is ok to offer the getAddress to all {{RpcGateway}} created on the 
rpc service including the server and client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4406) Implement job master registration at resource manager

2016-08-17 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu reassigned FLINK-4406:
--

Assignee: Wenlong Lyu

> Implement job master registration at resource manager
> -
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Job Master needs to register to Resource Manager when starting and then 
> watches leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4443) Add support in RpcCompletenessTest for inheritance of RpcGateway and RpcEndpoint

2016-08-21 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4443:
--

 Summary: Add support in RpcCompletenessTest for inheritance of 
RpcGateway and RpcEndpoint
 Key: FLINK-4443
 URL: https://issues.apache.org/jira/browse/FLINK-4443
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


RpcCompletenessTest needs to support RpcGateway which is composited by some 
basic functions like the example following:
{code:java}
public interface ExecutionStateListener extends RpcGateway {
public void notifyExecutionStateChanges();
}
public interface JobStateListener extends RpcGateway {
public void notifyJobStateChanges();
}
public interface JobWatcher extends ExecutionStateListener, JobStateListener, 
RpcGateway {

}
public class JobWatcherEndpoint extends RpcEndpoint {
protected JobWatcherEndpoint(RpcService rpcService) {
super(rpcService);
}
@RpcMethod
public void notifyExecutionStateChanges() {

}
@RpcMethod
public void notifyJobStateChanges() {

}
}
public class AttachedJobClient extends JobWatcherEndpoint {
protected JobClient(RpcService rpcService) {
super(rpcService);
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4375) Introduce rpc protocols implemented by job manager

2016-08-29 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu closed FLINK-4375.
--
Resolution: Invalid

> Introduce rpc protocols implemented by job manager
> --
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4375) Introduce rpc protocols implemented by job manager

2016-08-29 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447770#comment-15447770
 ] 

Wenlong Lyu commented on FLINK-4375:


the definition will be split into special interaction developments, so this sub 
task is not needed any more.

> Introduce rpc protocols implemented by job manager
> --
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4406) Implement job master registration at resource manager

2016-09-04 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-4406:
---
Assignee: Kurt Young  (was: Wenlong Lyu)

> Implement job master registration at resource manager
> -
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Wenlong Lyu
>Assignee: Kurt Young
>
> Job Master needs to register to Resource Manager when starting and then 
> watches leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method

2017-03-12 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906815#comment-15906815
 ] 

Wenlong Lyu commented on FLINK-6018:


I think we should just throw an exception since all initialization has been 
done in KeyedStateStore.

> Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` 
> method
> ---
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-14 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925592#comment-15925592
 ] 

Wenlong Lyu commented on FLINK-5756:


In RocksDB , the merge operation is processed in both compaction and get but 
not  in merge. When merging two Slices by a StringAppendOperator, you will need 
to create a new string, which can be time costly when there are thousands of 
slice to merge.

I think that is why it is slow to get the value after you added five thousand 
of items to List. If you call {{rocksdb.compactRange()}} before get, it will be 
quite quickly.

In really application scenario, the compaction happens more often than what is 
in the test, and the performance will be much better in real environment except 
for in the extreme test scenario.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6162) Fix bug in ByteArrayOutputStreamWithPos#setPosition

2017-03-22 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-6162:
--

 Summary: Fix bug in ByteArrayOutputStreamWithPos#setPosition
 Key: FLINK-6162
 URL: https://issues.apache.org/jira/browse/FLINK-6162
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Currently the precondition check in setPosition will fail when the buffer is 
full: 
{{Preconditions.checkArgument(position < getEndPosition(), "Position out of 
bounds.");}}

We should allow the expected position to be the end position



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5989) Protobuf in akka needs to be shaded

2017-04-01 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952094#comment-15952094
 ] 

Wenlong Lyu edited comment on FLINK-5989 at 4/1/17 7:10 AM:


hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to 
solve the problem
{code}
http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>

4.0.0


org.apache.flink
flink-parent
1.3-blink-1.1-SNAPSHOT
..


flink-shaded-akka-remote
flink-shaded-akka-remote

jar



com.data-artisans

flakka-remote_${scala.binary.version}
${akka.version}








org.apache.maven.plugins
maven-shade-plugin



shade-flink
none


shade-akka
package

shade



false

true

${project.basedir}/target/dependency-reduced-pom.xml

true








com.google.protobuf:*





com.google

org.apache.flink.akka.shaded.com.google



com.google.inject.**












{code}


was (Author: wenlong.lwl):
hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to 
solve the problem
{code}
http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>

4.0.0


org.apache.flink
flink-parent
1.3-blink-1.1-SNAPSHOT
..


flink-shaded-akka-remote
flink-shaded-akka-remote

jar



com.data-artisans

flakka-remote_${scala.binary.version}
${akka.version}








org.apache.maven.plugins
maven-shade-plugin



shade-flink
none



[jira] [Commented] (FLINK-5989) Protobuf in akka needs to be shaded

2017-04-01 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952094#comment-15952094
 ] 

Wenlong Lyu commented on FLINK-5989:


hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to 
solve the problem
{code}
http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>

4.0.0


org.apache.flink
flink-parent
1.3-blink-1.1-SNAPSHOT
..


flink-shaded-akka-remote
flink-shaded-akka-remote

jar



com.data-artisans

flakka-remote_${scala.binary.version}
${akka.version}








org.apache.maven.plugins
maven-shade-plugin



shade-flink
none


shade-akka
package

shade



false

true

${project.basedir}/target/dependency-reduced-pom.xml

true








com.google.protobuf:*





com.google

org.apache.flink.akka.shaded.com.google



com.google.inject.**












{/code}

> Protobuf in akka needs to be shaded
> ---
>
> Key: FLINK-5989
> URL: https://issues.apache.org/jira/browse/FLINK-5989
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Wenlong Lyu
>
> Currently akka introduces dependency on protobuf, which is a common jar used 
> in many systems, I think we need to use a shaded akka like what we do in 
> dependency on hadoop to avoid version conflicts with user code.
> {code}
> [INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile
> [INFO] |  \- com.typesafe:config:jar:1.2.1:compile
> [INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile
> [INFO] |  +- io.netty:netty:jar:3.8.0.Final:compile
> [INFO] |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
> [INFO] |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5284) Make output of bucketing sink compatible with other processing framework like mapreduce

2016-12-08 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-5284:
--

 Summary: Make output of bucketing sink compatible with other 
processing framework like mapreduce
 Key: FLINK-5284
 URL: https://issues.apache.org/jira/browse/FLINK-5284
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Currently bucketing sink cannot move the in-progress and pending files to final 
output when the stream finished, and when recovering, the current output file 
will contain some invalid content, which can only be identified by the 
file-length meta file. These make the final output of the job incompatible to 
other processing framework like mapreduce. There are two things to do to solve 
the problem:
1. add direct output option to bucketing sink, which writes output to the final 
file, and delete/truncate the some file when fail over. direct output will be 
quite useful specially for finite stream job, which can enable user to migrate 
there batch job to streaming, taking advantage of features such as 
checkpointing.
2. add truncate by copy option to enable bucketing sink to resize output file 
by copying content valid in current file instead of creating a length meta 
file. truncate by copy will make some more extra IO operation, but can make the 
output more clean.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows

2016-06-26 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350396#comment-15350396
 ] 

Wenlong Lyu commented on FLINK-2144:


we also consider increment agg for sliding window currently, evicting window 
must re-compute all the data of the window, no reducing state mechanism is 
available. 

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2144) Incremental count, average, and variance for windows

2016-06-27 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350396#comment-15350396
 ] 

Wenlong Lyu edited comment on FLINK-2144 at 6/27/16 7:57 AM:
-

we are also considering incremental aggregation solution for evicting window 
currently, evicting window must re-compute all the data of the window, no 
reducing state mechanism is available. It seems impossible to use fold function 
to solve the problem. [~aljoscha]


was (Author: dragon.l):
we also consider increment agg for sliding window currently, evicting window 
must re-compute all the data of the window, no reducing state mechanism is 
available. 

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows

2016-06-29 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354715#comment-15354715
 ] 

Wenlong Lyu commented on FLINK-2144:


our solution is mostly based on this paper: 
http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4256) Fine-grained recovery

2016-07-25 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391849#comment-15391849
 ] 

Wenlong Lyu commented on FLINK-4256:


hi, Stephan, we have implemented similar solution before, but simple back 
tracking and forward cannot work well in situation following:

Assuming job graph has A/B/C job vertices, A is connected to C with forward 
strategy, and B is connected to C all-to-all strategy, when a task of A failed, 
only one C task will be added to restart node set.

I suggesting divide the job graph in maximal connected sub-graphs treating the 
job graph as an undirected graph, when a job graph is submitted. Besides, when 
the job graph is in large scale because extracting related nodes according to a 
given node can be time costly and will be repeatedly used in long running, 
using sub-graphs can avoid the problem 

> Fine-grained recovery
> -
>
> Key: FLINK-4256
> URL: https://issues.apache.org/jira/browse/FLINK-4256
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> When a task fails during execution, Flink currently resets the entire 
> execution graph and triggers complete re-execution from the last completed 
> checkpoint. This is more expensive than just re-executing the failed tasks.
> In many cases, more fine-grained recovery is possible.
> The full description and design is in the corresponding FLIP.
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4256) Fine-grained recovery

2016-07-25 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391849#comment-15391849
 ] 

Wenlong Lyu edited comment on FLINK-4256 at 7/25/16 1:14 PM:
-

hi, Stephan, we have implemented similar solution before, but simple back 
tracking and forward cannot work well in situation following:

Assuming job graph has A/B/C job vertices, A is connected to C with forward 
strategy, and B is connected to C all-to-all strategy, when a task of A failed, 
only one C task will be added to restart node set.

I suggesting to  treat the job graph as an undirected graph and divide the job 
graph in maximal connected sub-graphs, when a job graph is submitted. when a 
task failover, restart the whole sub-graph related.

Besides, when the job graph is large because extracting related nodes according 
to a given node can be time costly and will be repeatedly used in long running, 
using sub-graphs can avoid the problem 


was (Author: dragon.l):
hi, Stephan, we have implemented similar solution before, but simple back 
tracking and forward cannot work well in situation following:

Assuming job graph has A/B/C job vertices, A is connected to C with forward 
strategy, and B is connected to C all-to-all strategy, when a task of A failed, 
only one C task will be added to restart node set.

I suggesting divide the job graph in maximal connected sub-graphs treating the 
job graph as an undirected graph, when a job graph is submitted. Besides, when 
the job graph is in large scale because extracting related nodes according to a 
given node can be time costly and will be repeatedly used in long running, 
using sub-graphs can avoid the problem 

> Fine-grained recovery
> -
>
> Key: FLINK-4256
> URL: https://issues.apache.org/jira/browse/FLINK-4256
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> When a task fails during execution, Flink currently resets the entire 
> execution graph and triggers complete re-execution from the last completed 
> checkpoint. This is more expensive than just re-executing the failed tasks.
> In many cases, more fine-grained recovery is possible.
> The full description and design is in the corresponding FLIP.
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4271) There is no way to set parallelism of operators such as map and window produced by

2016-07-27 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4271:
--

 Summary: There is no way to set parallelism of operators such as 
map and window produced by 
 Key: FLINK-4271
 URL: https://issues.apache.org/jira/browse/FLINK-4271
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Wenlong Lyu


Currently, CoGroupStreams package the map/keyBy/window operators with a human 
friendly interface, like: 
dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
intermediate operators and final window operators can not be accessed by users, 
and we cannot set attributes of the operators, which make co-group hard to use 
in production environment.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-07-27 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-4271:
---
Summary: There is no way to set parallelism of operators produced by 
CoGroupedStreams  (was: There is no way to set parallelism of operators such as 
map and window produced by )

> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-07-28 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397140#comment-15397140
 ] 

Wenlong Lyu commented on FLINK-4271:


I think the solution can be better. Not only the parallelism but also other 
attributes of the operator such as uid, name, etc. should be configurable.
Maybe the ```Where``` and ```EqualsTo``` in the ```CoGroupedStreams``` can 
provide some interface wrappering the same functions provided in the 
```SingleOutputStreamOperator```, I am not sure yet~

> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-07-28 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397140#comment-15397140
 ] 

Wenlong Lyu edited comment on FLINK-4271 at 7/28/16 7:05 AM:
-

I think the solution can be better. Not only the parallelism but also other 
attributes of the operator such as uid, name, etc. should be configurable.
Maybe the _Where_ and _EqualsTo_ in the _CoGroupedStreams_ can provide some 
interface wrappering the same functions provided in the 
_SingleOutputStreamOperator_, I am not sure yet~


was (Author: dragon.l):
I think the solution can be better. Not only the parallelism but also other 
attributes of the operator such as uid, name, etc. should be configurable.
Maybe the ```Where``` and ```EqualsTo``` in the ```CoGroupedStreams``` can 
provide some interface wrappering the same functions provided in the 
```SingleOutputStreamOperator```, I am not sure yet~

> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4344) Implement new JobManager

2016-08-11 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-4344:
---
Description: 
This is the parent issue for the efforts to implement the {{JobManager}} 
changes based on FLIP-6 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)

Because of the breadth of changes, we should implement a new version of the 
{{JobManager}} (let's call it {{JobMaster}}) rather than updating the current 
{{JobManager}}. That will allow us to keep a working master branch.
At the point when the new cluster management is on par with the current 
implementation, we will drop the old {{JobManager}} and rename the 
{{JobMaster}} to {{JobManager}}.

  was:
This is the parent issue for the efforts to implement the {{JobManager}} 
changes based on FLIP-6 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)

Because of the breadth of changes, we should implement a new version of the 
{{JobManager}} (let's call it {{JobMaster}}) rather than updating the current 
{{JobManager}}. That will allow us to keep a working master branch.
At the point when the new cluster management is on par with the current 
implementation, we will drop the old {{JobManager}}and rename the {{JobMaster}} 
to {{JobManager}}.


> Implement new JobManager
> 
>
> Key: FLINK-4344
> URL: https://issues.apache.org/jira/browse/FLINK-4344
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Stephan Ewen
>
> This is the parent issue for the efforts to implement the {{JobManager}} 
> changes based on FLIP-6 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)
> Because of the breadth of changes, we should implement a new version of the 
> {{JobManager}} (let's call it {{JobMaster}}) rather than updating the current 
> {{JobManager}}. That will allow us to keep a working master branch.
> At the point when the new cluster management is on par with the current 
> implementation, we will drop the old {{JobManager}} and rename the 
> {{JobMaster}} to {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4375) defined rpc protocols provided by job manager

2016-08-11 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4375:
--

 Summary: defined rpc protocols provided by job manager
 Key: FLINK-4375
 URL: https://issues.apache.org/jira/browse/FLINK-4375
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


 job manager RPC server needs to  implement a job control protocol, resource 
user protocol, task control protocol,
1. job controller: cancelJob, suspendJob, etc.
2. resource user: slotFailed(notify slot failure), 
slotAvailable(offer slot), etc.
3. task controller: updateTaskState, updateResultPartitionInfo, 
etc.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4375) define rpc protocols provided by job manager

2016-08-11 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-4375:
---
Summary: define rpc protocols provided by job manager  (was: defined rpc 
protocols provided by job manager)

> define rpc protocols provided by job manager
> 
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4376) implement job manager init procedure

2016-08-11 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4376:
--

 Summary: implement job manager init procedure
 Key: FLINK-4376
 URL: https://issues.apache.org/jira/browse/FLINK-4376
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Wenlong Lyu


JobManager should be created with a jobgraph and a resource manager gateway, 
the init procedure may be as follows:
1. init job manager rpc service and related rpc endpoints
2. try to elect as a leader job manager of the given job graph
3. create the slot pool manager with the given resource manager gateway
4. create the scheduler with the slot pool manager
5. create/restore execution graph from the job graph
6. start execution graph scheduling and run the job.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4378) Enable RollingSink to custom HDFS client configuration

2016-08-11 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4378:
--

 Summary: Enable RollingSink to custom HDFS client configuration
 Key: FLINK-4378
 URL: https://issues.apache.org/jira/browse/FLINK-4378
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Optimizing the configuration of hdfs client in different situation, such as 
{{io.file.buffer.size}}  can make rolling sink perform better. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4375) Introduce rpc protocols implemented by job manager

2016-08-11 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-4375:
---
Summary: Introduce rpc protocols implemented by job manager  (was: 
Introduce rpc protocols provided by job manager)

> Introduce rpc protocols implemented by job manager
> --
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat

2017-04-13 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu reassigned FLINK-6298:
--

Assignee: Wenlong Lyu

> Local execution is not setting RuntimeContext for RichOutputFormat
> --
>
> Key: FLINK-6298
> URL: https://issues.apache.org/jira/browse/FLINK-6298
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mateusz Zakarczemny
>Assignee: Wenlong Lyu
>
> RuntimeContext is never set in RichOutputFormat. I tested it in local 
> execution. RichMapFunction is setup correctly. 
> Following code will never print "//Context set in RichOutputFormat"
> {code}
> import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
> import org.apache.flink.api.common.io.RichOutputFormat
> import org.apache.flink.api.scala._
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> object Startup {
>   def main(args: Array[String]): Unit = {
> val mapFunction = new RichMapFunction[String, String] {
>   def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
>   def map(event: String) = { event }
>   override def setRuntimeContext(t: RuntimeContext) = {
> println("//Context set in RichMapFunction")
> super.setRuntimeContext(t)
>   }
> }
> val outputFormat = new RichOutputFormat[String] {
>   override def setRuntimeContext(t: RuntimeContext) = {
> println("//Context set in RichOutputFormat")
> super.setRuntimeContext(t)
>   }
>   def open(taskNumber: Int, numTasks: Int) {}
>   def writeRecord(event: String) {
> println(event)
>   }
>   def configure(parameters: Configuration): Unit = {}
>   def close(): Unit = {}
> }
> val see = StreamExecutionEnvironment.getExecutionEnvironment
> val eventsStream = see.fromElements[String]("A", "B", 
> "C").map(mapFunction)
> eventsStream.writeUsingOutputFormat(outputFormat)
> see.execute("test-job")
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat

2017-04-13 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967343#comment-15967343
 ] 

Wenlong Lyu commented on FLINK-6298:


It is a bug in all execution environenment, I think we should set runtime 
context when the output format is a RichOutputFormat.

> Local execution is not setting RuntimeContext for RichOutputFormat
> --
>
> Key: FLINK-6298
> URL: https://issues.apache.org/jira/browse/FLINK-6298
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mateusz Zakarczemny
>Assignee: Wenlong Lyu
>
> RuntimeContext is never set in RichOutputFormat. I tested it in local 
> execution. RichMapFunction is setup correctly. 
> Following code will never print "//Context set in RichOutputFormat"
> {code}
> import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
> import org.apache.flink.api.common.io.RichOutputFormat
> import org.apache.flink.api.scala._
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> object Startup {
>   def main(args: Array[String]): Unit = {
> val mapFunction = new RichMapFunction[String, String] {
>   def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
>   def map(event: String) = { event }
>   override def setRuntimeContext(t: RuntimeContext) = {
> println("//Context set in RichMapFunction")
> super.setRuntimeContext(t)
>   }
> }
> val outputFormat = new RichOutputFormat[String] {
>   override def setRuntimeContext(t: RuntimeContext) = {
> println("//Context set in RichOutputFormat")
> super.setRuntimeContext(t)
>   }
>   def open(taskNumber: Int, numTasks: Int) {}
>   def writeRecord(event: String) {
> println(event)
>   }
>   def configure(parameters: Configuration): Unit = {}
>   def close(): Unit = {}
> }
> val see = StreamExecutionEnvironment.getExecutionEnvironment
> val eventsStream = see.fromElements[String]("A", "B", 
> "C").map(mapFunction)
> eventsStream.writeUsingOutputFormat(outputFormat)
> see.execute("test-job")
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5815) Add resource files configuration for Yarn Mode

2017-02-16 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-5815:
--

 Summary: Add resource files configuration for Yarn Mode
 Key: FLINK-5815
 URL: https://issues.apache.org/jira/browse/FLINK-5815
 Project: Flink
  Issue Type: Improvement
  Components: Client, Distributed Coordination
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Currently in flink, when we want to setup a resource file to distributed cache, 
we need to make the file accessible remotely by a url, which is often difficult 
to maintain a service like that. What's more, when we want do add some extra 
jar files to job classpath, we need to copy the jar files to blob server when 
submitting the jobgraph. In yarn, especially in flip-6, the blob server is not 
running yet when we try to start a flink job. 
Yarn has a efficient distributed cache implementation for application running 
on it, what's more we can be easily share the files stored in hdfs in different 
application by distributed cache without extra IO operations. 
I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI to 
enable yarn user setup their job resource files by yarn distributed cache. The 
options is compatible with what is used in mapreduce, which make it easy to use 
for yarn user who generally has experience on using mapreduce.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.

2017-02-16 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-5817:
--

 Summary: Fix test concurrent execution failure by test dir 
conflicts.
 Key: FLINK-5817
 URL: https://issues.apache.org/jira/browse/FLINK-5817
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Currently when different users build flink on the same machine, failure may 
happen because some test utilities create test file using the fixed name, which 
will cause file access failing when different user processing the same file at 
the same time.
We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.

2017-02-16 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869704#comment-15869704
 ] 

Wenlong Lyu commented on FLINK-5817:


ok, It seems good, I will try it instead of adding a random suffix.

> Fix test concurrent execution failure by test dir conflicts.
> 
>
> Key: FLINK-5817
> URL: https://issues.apache.org/jira/browse/FLINK-5817
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Currently when different users build flink on the same machine, failure may 
> happen because some test utilities create test file using the fixed name, 
> which will cause file access failing when different user processing the same 
> file at the same time.
> We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.

2017-02-17 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871461#comment-15871461
 ] 

Wenlong Lyu commented on FLINK-5817:


[~shijinkui] I think the reason why you want to change the java.io.tmp property 
is to solve the ut problem. But I think the best way is to avoid UTs depends on 
the system property, which will make building more clear. And when we make it, 
where is no need to set java.io.tmp by pom any more.

> Fix test concurrent execution failure by test dir conflicts.
> 
>
> Key: FLINK-5817
> URL: https://issues.apache.org/jira/browse/FLINK-5817
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Currently when different users build flink on the same machine, failure may 
> happen because some test utilities create test file using the fixed name, 
> which will cause file access failing when different user processing the same 
> file at the same time.
> We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5815) Add resource files configuration for Yarn Mode

2017-02-19 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874104#comment-15874104
 ] 

Wenlong Lyu commented on FLINK-5815:


 Hi, till, we have finished an implementation. I was just blocked by some other 
work last week, I think I can submit the pull request soon this week.

> Add resource files configuration for Yarn Mode
> --
>
> Key: FLINK-5815
> URL: https://issues.apache.org/jira/browse/FLINK-5815
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN
>Affects Versions: 1.3.0
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Currently in flink, when we want to setup a resource file to distributed 
> cache, we need to make the file accessible remotely by a url, which is often 
> difficult to maintain a service like that. What's more, when we want do add 
> some extra jar files to job classpath, we need to copy the jar files to blob 
> server when submitting the jobgraph. In yarn, especially in flip-6, the blob 
> server is not running yet when we try to start a flink job. 
> Yarn has a efficient distributed cache implementation for application running 
> on it, what's more we can be easily share the files stored in hdfs in 
> different application by distributed cache without extra IO operations. 
> I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI 
> to enable yarn user setup their job resource files by yarn distributed cache. 
> The options is compatible with what is used in mapreduce, which make it easy 
> to use for yarn user who generally has experience on using mapreduce.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4378) Enable RollingSink to custom HDFS client configuration

2017-02-21 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu closed FLINK-4378.
--

fixed

> Enable RollingSink to custom HDFS client configuration
> --
>
> Key: FLINK-4378
> URL: https://issues.apache.org/jira/browse/FLINK-4378
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
> Fix For: 1.2.0
>
>
> Optimizing the configuration of hdfs client in different situation, such as 
> {{io.file.buffer.size}}  can make rolling sink perform better. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4443) Add support in RpcCompletenessTest for inheritance of RpcGateway and RpcEndpoint

2017-02-21 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu closed FLINK-4443.
--

fixed

> Add support in RpcCompletenessTest for inheritance of RpcGateway and 
> RpcEndpoint
> 
>
> Key: FLINK-4443
> URL: https://issues.apache.org/jira/browse/FLINK-4443
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> RpcCompletenessTest needs to support RpcGateway which is composited by some 
> basic functions like the example following:
> {code:java}
> public interface ExecutionStateListener extends RpcGateway {
>   public void notifyExecutionStateChanges();
> }
> public interface JobStateListener extends RpcGateway {
>   public void notifyJobStateChanges();
> }
> public interface JobWatcher extends ExecutionStateListener, JobStateListener, 
> RpcGateway {
> }
> public class JobWatcherEndpoint extends RpcEndpoint {
>   protected JobWatcherEndpoint(RpcService rpcService) {
>   super(rpcService);
>   }
>   @RpcMethod
>   public void notifyExecutionStateChanges() {
>   }
>   @RpcMethod
>   public void notifyJobStateChanges() {
>   }
> }
> public class AttachedJobClient extends JobWatcherEndpoint {
>   protected JobClient(RpcService rpcService) {
>   super(rpcService);
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5989) Protobuf in akka needs to be shaded

2017-03-07 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-5989:
--

 Summary: Protobuf in akka needs to be shaded
 Key: FLINK-5989
 URL: https://issues.apache.org/jira/browse/FLINK-5989
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Wenlong Lyu


Currently akka introduces dependency on protobuf, which is a common jar used in 
many systems, I think we need to use a shaded akka like what we do in 
dependency on hadoop to avoid version conflicts with user code.
{code}
[INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile
[INFO] |  \- com.typesafe:config:jar:1.2.1:compile
[INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile
[INFO] |  +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5989) Protobuf in akka needs to be shaded

2017-03-07 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900651#comment-15900651
 ] 

Wenlong Lyu commented on FLINK-5989:


[~mxm] I found that you change the dependency on akka to a data-artisan custom 
version. Can you add shade setting in the custom akka project?Or we need to add 
a flink-shaded-akka module?

> Protobuf in akka needs to be shaded
> ---
>
> Key: FLINK-5989
> URL: https://issues.apache.org/jira/browse/FLINK-5989
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Wenlong Lyu
>
> Currently akka introduces dependency on protobuf, which is a common jar used 
> in many systems, I think we need to use a shaded akka like what we do in 
> dependency on hadoop to avoid version conflicts with user code.
> {code}
> [INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile
> [INFO] |  \- com.typesafe:config:jar:1.2.1:compile
> [INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile
> [INFO] |  +- io.netty:netty:jar:3.8.0.Final:compile
> [INFO] |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
> [INFO] |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5749) Unset HADOOP_HOME and HADOOP_CONF_DIR from system environment when building flink

2017-02-08 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-5749:
--

 Summary: Unset HADOOP_HOME and HADOOP_CONF_DIR from system 
environment when building flink
 Key: FLINK-5749
 URL: https://issues.apache.org/jira/browse/FLINK-5749
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu
Assignee: Wenlong Lyu


Currently when we are trying to build flink on a machine with HADOOP_HOME 
environment variable set, Test data will be written to HDFS, instead of local 
tmp dir which is expected. This will cause tests failed. 
I suggest unset HADOOP_HOME and HADOOP_CONF_DIR environment variable in pom, to 
make sure maven run the test cases in the exactly way we want.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5749) unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build machine failing the UT and IT

2017-02-08 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-5749:
---
Summary: unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build 
machine failing the UT and IT  (was: Unset HADOOP_HOME and HADOOP_CONF_DIR from 
system environment when building flink)

> unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build machine 
> failing the UT and IT
> -
>
> Key: FLINK-5749
> URL: https://issues.apache.org/jira/browse/FLINK-5749
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Currently when we are trying to build flink on a machine with HADOOP_HOME 
> environment variable set, Test data will be written to HDFS, instead of local 
> tmp dir which is expected. This will cause tests failed. 
> I suggest unset HADOOP_HOME and HADOOP_CONF_DIR environment variable in pom, 
> to make sure maven run the test cases in the exactly way we want.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-16864) Add idle metrics for Task

2020-03-30 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-16864:
---

 Summary: Add idle metrics for Task
 Key: FLINK-16864
 URL: https://issues.apache.org/jira/browse/FLINK-16864
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Wenlong Lyu


Currently there is no metric for user to measure how busy a task is concretely, 
which is important for user to decide how to tune a job.

We would like to propose adding an IdleTime which measure idle time of a task 
including the time cost for mail processor to wait for new mail and the time 
cost in record writer to waiting a new buffer. 

With the idle time:
1. when a job can not catch up with the speed of data generating, the vertex 
which idle time is near to zero is the bottle neck of the job.
2. when a job is not busy, idle time  can be used to guide user how much he can 
scale down the job.

In addition, measuring idle time can have little impaction on the performance 
of the job, because when a task is busy, we don't touch the code to measure 
wait-time in mailbox.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17012) Expose cost of task initialization

2020-04-06 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17012:
---

 Summary: Expose cost of task initialization
 Key: FLINK-17012
 URL: https://issues.apache.org/jira/browse/FLINK-17012
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Wenlong Lyu


Currently a task switches to running before fully initialized, does not take 
state initialization and operator initialization(#open ) in to account, which 
may take long time to finish. As a result, there would be a weird phenomenon 
that all tasks are running but throughput is 0. 

I think it could be good if we can expose the initialization stage of tasks. 
What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17012) Expose stage of task initialization

2020-04-06 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-17012:

Summary: Expose stage of task initialization  (was: Expose cost of task 
initialization)

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-07 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077193#comment-17077193
 ] 

Wenlong Lyu commented on FLINK-17012:
-

[~chesnay] for debugging, logging maybe enough, but it would be hard to 
integrated with other systems.

[~pnowojski] Idle Time currently is designed as a Meter, it is not good to set 
a meter to be -1 or MAX_VALUE. 

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17040) SavepointWriterITCase broken

2020-04-07 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17040:
---

 Summary: SavepointWriterITCase broken
 Key: FLINK-17040
 URL: https://issues.apache.org/jira/browse/FLINK-17040
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu


I think it is because of the change of flink-16537 which create partition 
writer in beforeInvoke [~zhijiang]

Caused by: java.lang.UnsupportedOperationException: This method should never be 
called
at 
org.apache.flink.state.api.runtime.SavepointEnvironment.getAllWriters(SavepointEnvironment.java:242)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:439)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:454)
at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21010) Separete the implementation of StreamExecJoin

2021-01-18 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-21010:
---

 Summary: Separete the implementation of StreamExecJoin
 Key: FLINK-21010
 URL: https://issues.apache.org/jira/browse/FLINK-21010
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Wenlong Lyu
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21011) Separete the implementation of StreamExecJoin

2021-01-18 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-21011:
---

 Summary: Separete the implementation of StreamExecJoin
 Key: FLINK-21011
 URL: https://issues.apache.org/jira/browse/FLINK-21011
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Wenlong Lyu
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21011) Separete the implementation of StreamExecJoin

2021-01-18 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu closed FLINK-21011.
---
Resolution: Duplicate

> Separete the implementation of StreamExecJoin
> -
>
> Key: FLINK-21011
> URL: https://issues.apache.org/jira/browse/FLINK-21011
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Priority: Major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21011) Separate the implementation of StreamExecIntervalJoin

2021-01-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21011:

Summary: Separate the implementation of StreamExecIntervalJoin  (was: 
Separete the implementation of StreamExecJoin)

> Separate the implementation of StreamExecIntervalJoin
> -
>
> Key: FLINK-21011
> URL: https://issues.apache.org/jira/browse/FLINK-21011
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Priority: Major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-21011) Separate the implementation of StreamExecIntervalJoin

2021-01-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu reopened FLINK-21011:
-

> Separate the implementation of StreamExecIntervalJoin
> -
>
> Key: FLINK-21011
> URL: https://issues.apache.org/jira/browse/FLINK-21011
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Priority: Major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-5815) Add resource files configuration for Yarn Mode

2020-10-10 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu closed FLINK-5815.
--
Resolution: Abandoned

> Add resource files configuration for Yarn Mode
> --
>
> Key: FLINK-5815
> URL: https://issues.apache.org/jira/browse/FLINK-5815
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Deployment / YARN
>Affects Versions: 1.3.0
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently in flink, when we want to setup a resource file to distributed 
> cache, we need to make the file accessible remotely by a url, which is often 
> difficult to maintain a service like that. What's more, when we want do add 
> some extra jar files to job classpath, we need to copy the jar files to blob 
> server when submitting the jobgraph. In yarn, especially in flip-6, the blob 
> server is not running yet when we try to start a flink job. 
> Yarn has a efficient distributed cache implementation for application running 
> on it, what's more we can be easily share the files stored in hdfs in 
> different application by distributed cache without extra IO operations. 
> I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI 
> to enable yarn user setup their job resource files by yarn distributed cache. 
> The options is compatible with what is used in mapreduce, which make it easy 
> to use for yarn user who generally has experience on using mapreduce.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name

2017-12-04 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-8200:
--

 Summary: RocksDBAsyncSnapshotTest should use temp fold instead of 
fold with fixed name
 Key: FLINK-8200
 URL: https://issues.apache.org/jira/browse/FLINK-8200
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu


The following case failed when different user run the test in the same machine.

Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< 
FAILURE! - in org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest
testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest)
  Time elapsed: 0.023 sec  <<< ERROR!
java.io.IOException: No local storage directories available. Local DB files 
directory 'file:/tmp/foobar' does not exist and cannot be created.
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251)
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300)
    at 
org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name

2017-12-04 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu reassigned FLINK-8200:
--

Assignee: Wenlong Lyu

> RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
> -
>
> Key: FLINK-8200
> URL: https://issues.apache.org/jira/browse/FLINK-8200
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> The following case failed when different user run the test in the same 
> machine.
> Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< 
> FAILURE! - in 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest
> testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest)
>   Time elapsed: 0.023 sec  <<< ERROR!
> java.io.IOException: No local storage directories available. Local DB files 
> directory 'file:/tmp/foobar' does not exist and cannot be created.
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8201) YarnResourceManagerTest causes license checking failure

2017-12-04 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-8201:
--

 Summary: YarnResourceManagerTest causes license checking failure
 Key: FLINK-8201
 URL: https://issues.apache.org/jira/browse/FLINK-8201
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu


YarnResourceManagerTest generates a temporary taskmanager config file in 
flink-yarn module 
 root folder and never clear it, which makes license checking fail when we run 
{{mvn clean verify}} multiple times in the same source folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8201) YarnResourceManagerTest causes license checking failure

2017-12-05 Thread Wenlong Lyu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu reassigned FLINK-8201:
--

Assignee: Wenlong Lyu

> YarnResourceManagerTest causes license checking failure
> ---
>
> Key: FLINK-8201
> URL: https://issues.apache.org/jira/browse/FLINK-8201
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> YarnResourceManagerTest generates a temporary taskmanager config file in 
> flink-yarn module 
>  root folder and never clear it, which makes license checking fail when we 
> run {{mvn clean verify}} multiple times in the same source folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2020-01-17 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018086#comment-17018086
 ] 

Wenlong Lyu commented on FLINK-15635:
-

hi, [~twalthr], I would like to share some more classloading issues to solve in 
long run:
 # A sql platform product needs to provide as many connectors as possible for 
the convenience of the users. It is not good to load them all in a single job, 
which would waste a lot of resources and take long to deploy the job.
 # There can be different connectors for a kind of storage in different 
versions in the platform, which can have api or class conflicts, we can not 
just load them together in a job, and shading the class can solve some of the 
conflicts but not all.
 # Catalog is the storage of table and function definition, but currently, 
Catalog does not care about the related classloading when we need to get a 
table from it.

And my some intuitive thought:
 # Something like plugin in flink-core needs to be introduced to support 
dynamic class loading for user  defined connectors. 
 # Catalog can be responsible for providing jars for tables it provides. By 
default, we can put connector jars in /opt/connectors/XXX in flink distribution 
and catalog can search jars needed from the directories according to the 
factories matched result.
 # table env should be able to adding jars from catalog to job graph when 
submitting a job to a flink cluster.

What do you think?

> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2020-01-18 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018617#comment-17018617
 ] 

Wenlong Lyu edited comment on FLINK-15635 at 1/18/20 3:40 PM:
--

Thanks [~twalthr] for the explanation. I came to this issue from FLINK-15552, 
and thought that this issue is used to track long-term solution for class 
loading of sql client and table. I totally agree with you that we can have a 
mid-term fix, and introduce more improvements in the future versions.

For mid-term fix, +1 to add an explicit common classloader to replace the need 
of thread context classloader, which will also make the code more easy to 
understand.


was (Author: wenlong.lwl):
[~twalthr] thanks for the explanation. I came to this issue from FLINK-15552, 
and thought that this issue is used to track long-term solution for class 
loading of sql client and table. I totally agree with you that we can have a 
mid-term fix, and introduce more improvements in the future versions.

For mid-term fix, +1 to add an explicit common classloader to replace the need 
of thread context classloader, which will also make the code more easy to 
understand.

> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2020-01-18 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018617#comment-17018617
 ] 

Wenlong Lyu commented on FLINK-15635:
-

[~twalthr] thanks for the explanation. I came to this issue from FLINK-15552, 
and thought that this issue is used to track long-term solution for class 
loading of sql client and table. I totally agree with you that we can have a 
mid-term fix, and introduce more improvements in the future versions.

For mid-term fix, +1 to add an explicit common classloader to replace the need 
of thread context classloader, which will also make the code more easy to 
understand.

> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-18 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180209#comment-17180209
 ] 

Wenlong Lyu commented on FLINK-12351:
-

Hi, [~jark][~trohrmann] I think we may need to fix this issue in 1.11, it may 
be  a regression for 1.11. 

Before 1.11, AsyncWaitOperator is not chainnable because of 
[FLINK-13063|https://issues.apache.org/jira/browse/FLINK-13063], all of input 
records are new created from network inputs, so this bug would not be 
triggerred.

In 1.11, AsyncWaitOperator is chainnable 
again([FLINK-16219|https://issues.apache.org/jira/browse/FLINK-16219]), this 
bug would affect the result when object reuse is enabled. 

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2020-08-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181081#comment-17181081
 ] 

Wenlong Lyu commented on FLINK-12351:
-

[~pnowojski] As I known, it is easy to get whether the op is the head of chain 
by: StreamConfig#isChainStart

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089260#comment-17089260
 ] 

Wenlong Lyu commented on FLINK-17012:
-

[~pnowojski] I think whether the task is ready to process data is what users 
really care. Including the initialization step in the DEPLOYING state of task 
can be an option.
 
I agrees that we should avoid `initialize()` or `configure()` if possible. 
Regarding initializing in constructor, I think we would need to do more do more 
check (at least null check) when clean up in the exception catch clause and it 
would be impossible clean up externally which may cause resource leak when we 
trying to cancel a task by interrupting it, because the Invokable is not 
accessable when failed in constructor, we may need a Factory as [~pnowojski] 
suggested,  which can construct and initialize all components(statebackend, 
operator chain etc.) , create task by providing them explicitly, and clean them 
up when necessary.

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089350#comment-17089350
 ] 

Wenlong Lyu commented on FLINK-17313:
-

hi, all, the ticket is trying to fix the bug of the validation of 
PhysicalDataType and LogicalDataType of TableSink only, I think it is much more 
clear and clean, worth to consider to fix. Currently the validation on sink 
reuses the validation on source while they should be different actually.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractItera

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089382#comment-17089382
 ] 

Wenlong Lyu commented on FLINK-17313:
-

[~dwysakowicz] I think it is not necessary that the schema of logical schema 
and physical schema should be matched exactly:

Currently, we allow a column in source : logical type varchar(10), while 
pyshical type is varchar(5), see `CastAvoidanceChecker` used in the compatible 
check. 
The requirement on source is that: we need to be able convert a physical record 
of source to internal record according to physicalDataType and LogicalDataType. 

On sinks, the requirements should be reversed: we need to be able convert an 
internal record to a physical record for sink: so  we can allow a column of 
sink whose Logical type is varchar(5) but physical type is varchar(10).

On validation: we has an validation to make sure that the schema of source 
query of a sink match the logical type, the validation between logical type and 
physical can be much more loose I think.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(Planne

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089472#comment-17089472
 ] 

Wenlong Lyu commented on FLINK-17313:
-

[~dwysakowicz] Regarding LEGACY DECIMAL, I think it is a special case good to 
support: the physical presentation  of LAGECY DECIMAL is BigDecimal, can 
support any precision and scale, so allow such conversion will not break 
anything actually and the final precision and scale of the output BigDecimal 
still limited by logical data type so no data with error precision will be 
generated. 

What's more, with such support we can easily fill up the support of decimal in 
all kinds of sink with old interface. 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreac

[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-23 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091088#comment-17091088
 ] 

Wenlong Lyu commented on FLINK-17012:
-

I have come up the same question about init too earlier. I think if we want to 
do init in constructor, there would be no abstract `init`, every StreamTask 
should init itself part in its constructor: StreamTask only init the common 
part of resource such as statebackend and operator, and SourceStreamTask or 
OneInputStreamTask init it special part in the constructor.

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-25 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17386:
---

 Summary: Exception in HadoopSecurityContextFactory.createContext 
while no shaded-hadoop-lib provided.
 Key: FLINK-17386
 URL: https://issues.apache.org/jira/browse/FLINK-17386
 Project: Flink
  Issue Type: Bug
Reporter: Wenlong Lyu


java.io.IOException: Process execution failed due error. Error 
output:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.security.UserGroupInformation\n\tat 
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
 org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
 java.lang.Thread.run(Thread.java:834)

I think it is because exception throw in the static code block of 
UserInformation, we should catch Throwable instead of Exception in 
HadoopSecurityContextFactory#createContext?
[~rongrong] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-25 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-17386:

Description: 
java.io.IOException: Process execution failed due error. Error 
output:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.security.UserGroupInformation\n\tat 
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
 org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
 java.lang.Thread.run(Thread.java:834)

I think it is because exception throw in the static code block of 
UserInformation, we should catch Throwable instead of Exception in 
HadoopSecurityContextFactory#createContext?
[~rongr] what do you think?

  was:
java.io.IOException: Process execution failed due error. Error 
output:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.security.UserGroupInformation\n\tat 
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
 org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
 
com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
 
com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
 java.lang.Thread.run(Thread.java:834)

I think it is because exception throw in the static code block of 
UserInformation, we should catch Throwable instead of Exception in 
HadoopSecurityContextFactory#createContext?
[~rongrong] what do you think?


> Exception in HadoopSecurityContextFactory.createContext while no 
> shaded-hadoop-lib provided.
> 
>
> Key: FLINK-17386
> URL: https://issues.apache.org/jira/browse/FLINK-17386
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Priority: Major
>
> java.io.IOException: Process execution failed due error. Error 
> output:java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.hadoop.security.UserGroupInformation\n\tat 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
>  
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)

[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-27 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094097#comment-17094097
 ] 

Wenlong Lyu commented on FLINK-17386:
-

hi [~rongr], I don't want shaded-hadoop-lib in the lib, because I didn't want 
to submit job to yarn. I did more investigation on this issue, found that it is 
because there is a customized state backend in the lib which is designed to 
support write to hdfs and contains some of hadoop libs but not all. The root 
cause is that: the security loader only check whether Configuration and 
UserGroupInformation is in classpath or not, which may cause the exception 
above when no enough lib is in the lib dir.

> Exception in HadoopSecurityContextFactory.createContext while no 
> shaded-hadoop-lib provided.
> 
>
> Key: FLINK-17386
> URL: https://issues.apache.org/jira/browse/FLINK-17386
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Priority: Major
>
> java.io.IOException: Process execution failed due error. Error 
> output:java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.hadoop.security.UserGroupInformation\n\tat 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
>  
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
>  java.lang.Thread.run(Thread.java:834)
> I think it is because exception throw in the static code block of 
> UserInformation, we should catch Throwable instead of Exception in 
> HadoopSecurityContextFactory#createContext?
> [~rongr] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-28 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094218#comment-17094218
 ] 

Wenlong Lyu commented on FLINK-17386:
-

yes, you are right about the use case. We are doing some testing based on the 
master branch, I can do a test on 1.10, and feed back later.

> Exception in HadoopSecurityContextFactory.createContext while no 
> shaded-hadoop-lib provided.
> 
>
> Key: FLINK-17386
> URL: https://issues.apache.org/jira/browse/FLINK-17386
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Priority: Major
>
> java.io.IOException: Process execution failed due error. Error 
> output:java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.hadoop.security.UserGroupInformation\n\tat 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
>  
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
>  java.lang.Thread.run(Thread.java:834)
> I think it is because exception throw in the static code block of 
> UserInformation, we should catch Throwable instead of Exception in 
> HadoopSecurityContextFactory#createContext?
> [~rongr] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-28 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094235#comment-17094235
 ] 

Wenlong Lyu commented on FLINK-17386:
-

[~rongr] I run a quick test by copy the statebackend to the lib of 1.10 and run 
a flink command, no error happen.

> Exception in HadoopSecurityContextFactory.createContext while no 
> shaded-hadoop-lib provided.
> 
>
> Key: FLINK-17386
> URL: https://issues.apache.org/jira/browse/FLINK-17386
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Priority: Major
>
> java.io.IOException: Process execution failed due error. Error 
> output:java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.hadoop.security.UserGroupInformation\n\tat 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
>  
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
>  java.lang.Thread.run(Thread.java:834)
> I think it is because exception throw in the static code block of 
> UserInformation, we should catch Throwable instead of Exception in 
> HadoopSecurityContextFactory#createContext?
> [~rongr] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.

2020-04-29 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096232#comment-17096232
 ] 

Wenlong Lyu commented on FLINK-17386:
-

[~rongr] I have verified the patch, it works.

> Exception in HadoopSecurityContextFactory.createContext while no 
> shaded-hadoop-lib provided.
> 
>
> Key: FLINK-17386
> URL: https://issues.apache.org/jira/browse/FLINK-17386
> Project: Flink
>  Issue Type: Bug
>Reporter: Wenlong Lyu
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> java.io.IOException: Process execution failed due error. Error 
> output:java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.hadoop.security.UserGroupInformation\n\tat 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat
>  
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat 
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat
>  
> com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat
>  
> com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat
>  java.lang.Thread.run(Thread.java:834)
> I think it is because exception throw in the static code block of 
> UserInformation, we should catch Throwable instead of Exception in 
> HadoopSecurityContextFactory#createContext?
> [~rongr] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-09 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079409#comment-17079409
 ] 

Wenlong Lyu commented on FLINK-17012:
-

[~chesnay] I think it is not a good choice to treat processing first record as 
the signal of finishing initialization: when there is a filter in a job which 
filters most of the records or there is no input for long time, we will make a 
wrong decision. IMO, it would be perfect if we can add a new stage(state) for 
task such as Initializing which can be easily visualized but it can hurt 
compatibility, a separate metric maybe  a good compromise?

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-12 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082008#comment-17082008
 ] 

Wenlong Lyu commented on FLINK-17012:
-

[~chesnay] If the state is a TaskExecutor-local thing, We still need to a 
solution to expose such local information to users/external systems. 

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17109) FlinkPipelineTranslationUtil#getJobGraph doesn't consider Jars and Classpaths in configuration.

2020-04-13 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17109:
---

 Summary: FlinkPipelineTranslationUtil#getJobGraph doesn't consider 
Jars and Classpaths in configuration.
 Key: FLINK-17109
 URL: https://issues.apache.org/jira/browse/FLINK-17109
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Reporter: Wenlong Lyu


I think this method should be removed, and we should use 
PipelineExecutorUtils#getJobGraph instead?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects configuration

2020-04-13 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17110:
---

 Summary: Make StreamExecutionEnvironment#configure also affects 
configuration
 Key: FLINK-17110
 URL: https://issues.apache.org/jira/browse/FLINK-17110
 Project: Flink
  Issue Type: Improvement
Reporter: Wenlong Lyu


If StreamExecutionEnvironment#configure can also affect the configuration in 
StreamExecutionEnvironment, we can easily not only add some library jars or 
classpaths dynamically according to the job we want to run which is quite 
important for a platform product, but also optimize some runtime configuration 
in program.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects configuration

2020-04-14 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083070#comment-17083070
 ] 

Wenlong Lyu commented on FLINK-17110:
-

[~aljoscha] I mean configuration in StreamExecutionEnvironment, which will be 
used to generated the job graph and setup per job cluster I think.

[~kkl0u] I propose to merge the configuration provided by the method of 
StreamExecutionEnvironment#configure to 
StreamExecutionEnvironment#configuration, so that we can change more 
configuration of runtime programmatically. 

In my use case, I am trying to create a tool to support running arbitrary sql 
jobs in per job cluster mode, which needs to load connector jars according to 
the sql dynamically. Currently I have to merge all library jars together, which 
will make it inconvenient to introduce a new connector and easy to cause class 
conflicts. It would be possible if we can set PipelineOptions.CLASSPATHS 
programmatically and store the library jars in some http server, which can be 
configured by StreamExecutionEnvironment#configuration.
Another case, it could be also useful to optimize advanced runtime 
configuration according to job we are running, such as, set a larger resource 
request timeout for a large job. It could be configured in the commandLine too, 
but it will be more inconvenient I think.

> Make StreamExecutionEnvironment#configure also affects configuration
> 
>
> Key: FLINK-17110
> URL: https://issues.apache.org/jira/browse/FLINK-17110
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wenlong Lyu
>Priority: Major
>
> If StreamExecutionEnvironment#configure can also affect the configuration in 
> StreamExecutionEnvironment, we can easily not only add some library jars or 
> classpaths dynamically according to the job we want to run which is quite 
> important for a platform product, but also optimize some runtime 
> configuration in program.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree 

2020-04-14 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083313#comment-17083313
 ] 

Wenlong Lyu commented on FLINK-17139:
-

in 1.10 the compile for each dml is eager: 
StreamTableEnvironmentImpl#isEagerOperationTranslation return true, so we can't 
reuse the source. I believe that it is solved in master.

> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree 
> 
>
> Key: FLINK-17139
> URL: https://issues.apache.org/jira/browse/FLINK-17139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: jinfeng
>Priority: Major
>
> The sub-plan reuse optimizer can't reuse sub-plan that from different sql 
> node tree .
> {code:java}
> //代码占位符
> create table SourceTable ...;
> create table SinkTable1 ;
> create table SinkTable2 ;
> insert into SinkTable1 select * from SourceTable;
> insert into SinkTable2 select * from SourceTable;
> {code}
> SourceTable will not be reuse when I execute the below sql statements;
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17141) Name of SQL Operator is too long

2020-04-14 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-17141:
---

 Summary: Name of SQL Operator is too long
 Key: FLINK-17141
 URL: https://issues.apache.org/jira/browse/FLINK-17141
 Project: Flink
  Issue Type: Improvement
Reporter: Wenlong Lyu


the name of the operator contains the detail logic of the operator, which make 
it very large when there are a lot of columns. It is a disaster for logging and 
web ui, also can cost a lot of memory because we use the name widely such as 
ExecutionVertex and failover message etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects StreamExecutionEnvironment#configuration

2020-04-15 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-17110:

Summary: Make StreamExecutionEnvironment#configure also affects 
StreamExecutionEnvironment#configuration  (was: Make 
StreamExecutionEnvironment#configure also affects configuration)

> Make StreamExecutionEnvironment#configure also affects 
> StreamExecutionEnvironment#configuration
> ---
>
> Key: FLINK-17110
> URL: https://issues.apache.org/jira/browse/FLINK-17110
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wenlong Lyu
>Priority: Major
>
> If StreamExecutionEnvironment#configure can also affect the configuration in 
> StreamExecutionEnvironment, we can easily not only add some library jars or 
> classpaths dynamically according to the job we want to run which is quite 
> important for a platform product, but also optimize some runtime 
> configuration in program.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects StreamExecutionEnvironment#configuration

2020-04-15 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083957#comment-17083957
 ] 

Wenlong Lyu commented on FLINK-17110:
-

hi, [~aljoscha][~kkl0u] Do you have any more comments? Here is a patch : 
https://github.com/wenlong88/flink/commit/651a48ce4d837623e6eacb4814bd1e43c686fdd4,
 mainly contains two points of change:
1. use PipelineExecutorUtils to generate JobGraph so that the configuration in 
StreamExecutionEnvironment can affect on JobGraph, unifying the manner on 
RemoteExecutor, LocalExecutor, ClasspathJobGraphRetriever and web submit 
handler.
2. make configuration in StreamExecutionEnvironment configurable by 
StreamExecutionEnvironment#configure, so that user can update the configuration 
in the program.

> Make StreamExecutionEnvironment#configure also affects 
> StreamExecutionEnvironment#configuration
> ---
>
> Key: FLINK-17110
> URL: https://issues.apache.org/jira/browse/FLINK-17110
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wenlong Lyu
>Priority: Major
>
> If StreamExecutionEnvironment#configure can also affect the configuration in 
> StreamExecutionEnvironment, we can easily not only add some library jars or 
> classpaths dynamically according to the job we want to run which is quite 
> important for a platform product, but also optimize some runtime 
> configuration in program.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-15 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084135#comment-17084135
 ] 

Wenlong Lyu commented on FLINK-17012:
-

Treating initialization as part of deploying is also a reasonable idea.
How about we continue FLINK-1474: add an `initialize()` in AbstractInvokable 
and call initialize in task before task transfer to running? Currently there 
are similar functionalities in both StreamTask and BatchTask: 
StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), 
the different is that; there are some common initialization in BatchTask#invoke 
instead of  BatchTask#initialize. we may need a bit more refactor for BatchTask

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17012) Expose stage of task initialization

2020-04-15 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084135#comment-17084135
 ] 

Wenlong Lyu edited comment on FLINK-17012 at 4/15/20, 2:37 PM:
---

Treating initialization as part of deploying is also a reasonable idea.
How about we continue FLINK-4714: add an `initialize()` in AbstractInvokable 
and call initialize in task before task transfer to running? Currently there 
are similar functionalities in both StreamTask and BatchTask: 
StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), 
the different is that; there are some common initialization in BatchTask#invoke 
instead of  BatchTask#initialize. we may need a bit more refactor for BatchTask


was (Author: wenlong.lwl):
Treating initialization as part of deploying is also a reasonable idea.
How about we continue FLINK-1474: add an `initialize()` in AbstractInvokable 
and call initialize in task before task transfer to running? Currently there 
are similar functionalities in both StreamTask and BatchTask: 
StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), 
the different is that; there are some common initialization in BatchTask#invoke 
instead of  BatchTask#initialize. we may need a bit more refactor for BatchTask

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087435#comment-17087435
 ] 

Wenlong Lyu commented on FLINK-17012:
-

hi, [~sewen] thanks for the explanation. but I think that it may not be good to 
do much IO and user code initialization(including state intialization and 
Function#open) in the constructor, it can easily cause exceptions because of 
failure of external system and user code exception. It would be difficult to do 
clean up when exception happens in the constructor.

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16279) Per job Yarn application leak in normal execution mode.

2020-02-25 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-16279:
---

 Summary: Per job Yarn application leak in normal execution mode.
 Key: FLINK-16279
 URL: https://issues.apache.org/jira/browse/FLINK-16279
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Wenlong Lyu


I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but 
the yarn cluster didn't be destroyed.

After some research on the code, I found that:

when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} 
before received a request from job client. 
{code}
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;

LOG.debug("Shutting down per-job cluster 
because someone retrieved the job result.");
shutDownFuture.complete(status);
});
} 
{code}

However, when running in async mode(submit job by env.executeAsync), there may 
be no request from job client because when a user find that the job is failed 
from job client, he may never request the result again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16279) Per job Yarn application leak in normal execution mode.

2020-02-25 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-16279:

Description: 
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but 
the yarn cluster didn't be destroyed.

After some research on the code, I found that:

when running in attached mode, MiniDispatcher will never set {{shutDownfuture}} 
before received a request from job client. 
{code}
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;

LOG.debug("Shutting down per-job cluster 
because someone retrieved the job result.");
shutDownFuture.complete(status);
});
} 
{code}

However, when running in async mode(submit job by env.executeAsync), there may 
be no request from job client because when a user find that the job is failed 
from job client, he may never request the result again.

  was:
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but 
the yarn cluster didn't be destroyed.

After some research on the code, I found that:

when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} 
before received a request from job client. 
{code}
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;

LOG.debug("Shutting down per-job cluster 
because someone retrieved the job result.");
shutDownFuture.complete(status);
});
} 
{code}

However, when running in async mode(submit job by env.executeAsync), there may 
be no request from job client because when a user find that the job is failed 
from job client, he may never request the result again.


> Per job Yarn application leak in normal execution mode.
> ---
>
> Key: FLINK-16279
> URL: https://issues.apache.org/jira/browse/FLINK-16279
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Wenlong Lyu
>Priority: Major
>
> I run a job in yarn per job mode using {{env.executeAsync}}, the job failed 
> but the yarn cluster didn't be destroyed.
> After some research on the code, I found that:
> when running in attached mode, MiniDispatcher will never set 
> {{shutDownfuture}} before received a request from job client. 
> {code}
>   if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
>   // terminate the MiniDispatcher once we served the 
> first JobResult successfully
>   jobResultFuture.thenAccept((JobResult result) -> {
>   ApplicationStatus status = 
> result.getSerializedThrowable().isPresent() ?
>   ApplicationStatus.FAILED : 
> ApplicationStatus.SUCCEEDED;
>   LOG.debug("Shutting down per-job cluster 
> because someone retrieved the job result.");
>   shutDownFuture.complete(status);
>   });
>   } 
> {code}
> However, when running in async mode(submit job by env.executeAsync), there 
> may be no request from job client because when a user find that the job is 
> failed from job client, he may never request the result again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21870) RelDataType of TableFunction different in local and CI

2021-03-19 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-21870:
---

 Summary: RelDataType of TableFunction different in local and CI
 Key: FLINK-21870
 URL: https://issues.apache.org/jira/browse/FLINK-21870
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu


Locally the StructKind of RelDataType is None, however in CI environment, the 
StructKind is FULL_QUOLIFIED, see the case in 
CorrelateJsonPlanTest#testRegisterByClass



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21785:

Affects Version/s: 1.13.0

> Support StreamExecCorrelate json serde
> --
>
> Key: FLINK-21785
> URL: https://issues.apache.org/jira/browse/FLINK-21785
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.13.0
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21785:

Component/s: Table SQL / Planner

> Support StreamExecCorrelate json serde
> --
>
> Key: FLINK-21785
> URL: https://issues.apache.org/jira/browse/FLINK-21785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21785:

Fix Version/s: 1.13.0

> Support StreamExecCorrelate json serde
> --
>
> Key: FLINK-21785
> URL: https://issues.apache.org/jira/browse/FLINK-21785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21785:

Affects Version/s: (was: 1.13.0)

> Support StreamExecCorrelate json serde
> --
>
> Key: FLINK-21785
> URL: https://issues.apache.org/jira/browse/FLINK-21785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21813) Support StreamExecOverAggregate json ser/de

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21813:

Component/s: Table SQL / Planner

> Support StreamExecOverAggregate json ser/de
> ---
>
> Key: FLINK-21813
> URL: https://issues.apache.org/jira/browse/FLINK-21813
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21816) Support StreamExecMatch json ser/de

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21816:

Fix Version/s: 1.13.0

> Support StreamExecMatch json ser/de
> ---
>
> Key: FLINK-21816
> URL: https://issues.apache.org/jira/browse/FLINK-21816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21813) Support StreamExecOverAggregate json ser/de

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21813:

Fix Version/s: 1.13.0

> Support StreamExecOverAggregate json ser/de
> ---
>
> Key: FLINK-21813
> URL: https://issues.apache.org/jira/browse/FLINK-21813
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21816) Support StreamExecMatch json ser/de

2021-03-19 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-21816:

Component/s: Table SQL / Planner

> Support StreamExecMatch json ser/de
> ---
>
> Key: FLINK-21816
> URL: https://issues.apache.org/jira/browse/FLINK-21816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment

2021-03-29 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-22020:
---

 Summary: Reflections ClassNotFound when trying to deserialize a 
json plan in deployment envrionment 
 Key: FLINK-22020
 URL: https://issues.apache.org/jira/browse/FLINK-22020
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment

2021-03-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-22020:

Attachment: error_log.png

> Reflections ClassNotFound when trying to deserialize a json plan in 
> deployment envrionment 
> ---
>
> Key: FLINK-22020
> URL: https://issues.apache.org/jira/browse/FLINK-22020
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Wenlong Lyu
>Priority: Major
> Attachments: error_log.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment

2021-03-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-22020:

Description: after some investigation, I found that it is because we can 
don't include org.reflections in pom of flink-table-planner-blink

> Reflections ClassNotFound when trying to deserialize a json plan in 
> deployment envrionment 
> ---
>
> Key: FLINK-22020
> URL: https://issues.apache.org/jira/browse/FLINK-22020
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Wenlong Lyu
>Priority: Major
> Attachments: error_log.png
>
>
> after some investigation, I found that it is because we can don't include 
> org.reflections in pom of flink-table-planner-blink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23159) Correlated sql subquery on the source created via fromValues() failed to compile

2021-07-02 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373308#comment-17373308
 ] 

Wenlong Lyu commented on FLINK-23159:
-

[~gaoyunhaii] Thanks for reporting the issue, the root cause of this issue is 
that currently Values is excluded in SubqueryDecorrelator, I would try to fix 
it.

> Correlated sql subquery on the source created via fromValues() failed to 
> compile
> 
>
> Key: FLINK-23159
> URL: https://issues.apache.org/jira/browse/FLINK-23159
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Yun Gao
>Priority: Major
>
> Correlated subquery like 
> {code:java}
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.types.Row;
> import java.util.ArrayList;
> import java.util.List;
> public class SQLQueryTest {
>   public static void main(String[] args) {
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode()
>   .build();
> TableEnvironment tableEnvironment = TableEnvironment.create(settings);
> DataType row = DataTypes.ROW(
>   DataTypes.FIELD("flag", DataTypes.STRING()),
>   DataTypes.FIELD("id", DataTypes.INT()),
>   DataTypes.FIELD("name", DataTypes.STRING())
> );
> Table table = tableEnvironment.fromValues(row, new 
> MyListSource("table1").builder());
> tableEnvironment.createTemporaryView("table1", table);
> table = tableEnvironment.fromValues(row, new 
> MyListSource("table2").builder());
> tableEnvironment.createTemporaryView("table2", table);
> String sql = "select t1.flag from table1 t1 where t1.name in (select 
> t2.name from table2 t2 where t2.id = t1.id)";
> tableEnvironment.explainSql(sql);
>   }
>   public static class MyListSource {
> private String flag;
> public MyListSource(String flag) {
>   this.flag = flag;
> }
> public List builder() {
>   List rows = new ArrayList<>();
>   for (int i = 2; i < 3; i++) {
> Row row = new Row(3);
> row.setField(0, flag);
> row.setField(1, i);
> row.setField(2, "me");
> rows.add(row);
>   }
>   return rows;
> }
>   }
> }
> {code}
> would throws
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> unexpected correlate variable $cor0 in the plan
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.immutable.Range.foreach(Range.scala:160)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> 

[jira] [Commented] (FLINK-23385) Fix nullability of COALESCE

2021-07-16 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382009#comment-17382009
 ] 

Wenlong Lyu commented on FLINK-23385:
-

hi, [~twalthr] I think the root cause of the issue is the return type of 
REGEXP_EXTRACT should be force nullable instead of  depending on input type, 
introduce by FLINK-13783

> Fix nullability of COALESCE
> ---
>
> Key: FLINK-23385
> URL: https://issues.apache.org/jira/browse/FLINK-23385
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> EDIT: Simpler case:
> {code:java}
> SELECT COALESCE(REGEXP_EXTRACT('22','[A-Z]+'),'-');
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, 
> however, a null value is being written into it. You can set job configuration 
> 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and 
> drop such records silently.
> {code}
> When using REGEXP_EXTRACT on NOT NULL column I'm getting following exception
> {code:java}
> select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test limit 10
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, 
> however, a null value is being written into it. You can set job configuration 
> 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and 
> drop such records silently.
> {code}
> I think the reason is that nullability of result is wrongly calculated.
>  Example:
> {code:java}
> create table test (
>  test STRING NOT NULL
> ) WITH (
> 'connector' = 'datagen'
> );
> explain select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test
> == Abstract Syntax Tree ==
> LogicalProject(EXPR$0=[REGEXP_EXTRACT($0, _UTF-16LE'[A-Z]+')])
> +- LogicalTableScan(table=[[default_catalog, default_database, test]])== 
> Optimized Physical Plan ==
> Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0])
> +- TableSourceScan(table=[[default_catalog, default_database, test]], 
> fields=[test])== Optimized Execution Plan ==
> Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0])
> +- TableSourceScan(table=[[default_catalog, default_database, test]], 
> fields=[test]){code}
> As you can see Flink is removing COALESCE from query which is wrong.
>  
> Same for view (null = false):
> {code:java}
> create view v as select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from 
> test
> describe v;
> +++---+-++---+
> |   name |   type |  null | key | extras | watermark |
> +++---+-++---+
> | EXPR$0 | STRING | false | ||   |
> +++---+-++---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >