[jira] [Created] (FLINK-4406) Implement job master registration at resource manager
Wenlong Lyu created FLINK-4406: -- Summary: Implement job master registration at resource manager Key: FLINK-4406 URL: https://issues.apache.org/jira/browse/FLINK-4406 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Wenlong Lyu Job Master needs to register to Resource Manager when starting and then watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4414) Remove restriction on RpcService.getAddress
Wenlong Lyu created FLINK-4414: -- Summary: Remove restriction on RpcService.getAddress Key: FLINK-4414 URL: https://issues.apache.org/jira/browse/FLINK-4414 Project: Flink Issue Type: Sub-task Reporter: Wenlong Lyu Assignee: Wenlong Lyu currently {{RpcService}} provide only address of the endpoint, I think rpc service serve both the endpoint create on it and the remote gateway create on it, so it is ok to offer the getAddress to all {{RpcGateway}} created on the rpc service including the server and client. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4406) Implement job master registration at resource manager
[ https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu reassigned FLINK-4406: -- Assignee: Wenlong Lyu > Implement job master registration at resource manager > - > > Key: FLINK-4406 > URL: https://issues.apache.org/jira/browse/FLINK-4406 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > Job Master needs to register to Resource Manager when starting and then > watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4443) Add support in RpcCompletenessTest for inheritance of RpcGateway and RpcEndpoint
Wenlong Lyu created FLINK-4443: -- Summary: Add support in RpcCompletenessTest for inheritance of RpcGateway and RpcEndpoint Key: FLINK-4443 URL: https://issues.apache.org/jira/browse/FLINK-4443 Project: Flink Issue Type: Sub-task Reporter: Wenlong Lyu Assignee: Wenlong Lyu RpcCompletenessTest needs to support RpcGateway which is composited by some basic functions like the example following: {code:java} public interface ExecutionStateListener extends RpcGateway { public void notifyExecutionStateChanges(); } public interface JobStateListener extends RpcGateway { public void notifyJobStateChanges(); } public interface JobWatcher extends ExecutionStateListener, JobStateListener, RpcGateway { } public class JobWatcherEndpoint extends RpcEndpoint { protected JobWatcherEndpoint(RpcService rpcService) { super(rpcService); } @RpcMethod public void notifyExecutionStateChanges() { } @RpcMethod public void notifyJobStateChanges() { } } public class AttachedJobClient extends JobWatcherEndpoint { protected JobClient(RpcService rpcService) { super(rpcService); } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4375) Introduce rpc protocols implemented by job manager
[ https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu closed FLINK-4375. -- Resolution: Invalid > Introduce rpc protocols implemented by job manager > -- > > Key: FLINK-4375 > URL: https://issues.apache.org/jira/browse/FLINK-4375 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > job manager RPC server needs to implement a job control protocol, resource > user protocol, task control protocol, > 1. job controller: cancelJob, suspendJob, etc. > 2. resource user: slotFailed(notify slot failure), > slotAvailable(offer slot), etc. > 3. task controller: updateTaskState, updateResultPartitionInfo, > etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4375) Introduce rpc protocols implemented by job manager
[ https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447770#comment-15447770 ] Wenlong Lyu commented on FLINK-4375: the definition will be split into special interaction developments, so this sub task is not needed any more. > Introduce rpc protocols implemented by job manager > -- > > Key: FLINK-4375 > URL: https://issues.apache.org/jira/browse/FLINK-4375 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > job manager RPC server needs to implement a job control protocol, resource > user protocol, task control protocol, > 1. job controller: cancelJob, suspendJob, etc. > 2. resource user: slotFailed(notify slot failure), > slotAvailable(offer slot), etc. > 3. task controller: updateTaskState, updateResultPartitionInfo, > etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4406) Implement job master registration at resource manager
[ https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-4406: --- Assignee: Kurt Young (was: Wenlong Lyu) > Implement job master registration at resource manager > - > > Key: FLINK-4406 > URL: https://issues.apache.org/jira/browse/FLINK-4406 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Wenlong Lyu >Assignee: Kurt Young > > Job Master needs to register to Resource Manager when starting and then > watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-6018) Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` method
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906815#comment-15906815 ] Wenlong Lyu commented on FLINK-6018: I think we should just throw an exception since all initialization has been done in KeyedStateStore. > Minor improvements about `AbstractKeyedStateBackend#getPartitionedState` > method > --- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925592#comment-15925592 ] Wenlong Lyu commented on FLINK-5756: In RocksDB , the merge operation is processed in both compaction and get but not in merge. When merging two Slices by a StringAppendOperator, you will need to create a new string, which can be time costly when there are thousands of slice to merge. I think that is why it is slow to get the value after you added five thousand of items to List. If you call {{rocksdb.compactRange()}} before get, it will be quite quickly. In really application scenario, the compaction happens more often than what is in the test, and the performance will be much better in real environment except for in the extreme test scenario. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6162) Fix bug in ByteArrayOutputStreamWithPos#setPosition
Wenlong Lyu created FLINK-6162: -- Summary: Fix bug in ByteArrayOutputStreamWithPos#setPosition Key: FLINK-6162 URL: https://issues.apache.org/jira/browse/FLINK-6162 Project: Flink Issue Type: Bug Components: Core Reporter: Wenlong Lyu Assignee: Wenlong Lyu Currently the precondition check in setPosition will fail when the buffer is full: {{Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds.");}} We should allow the expected position to be the end position -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5989) Protobuf in akka needs to be shaded
[ https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952094#comment-15952094 ] Wenlong Lyu edited comment on FLINK-5989 at 4/1/17 7:10 AM: hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to solve the problem {code} http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> 4.0.0 org.apache.flink flink-parent 1.3-blink-1.1-SNAPSHOT .. flink-shaded-akka-remote flink-shaded-akka-remote jar com.data-artisans flakka-remote_${scala.binary.version} ${akka.version} org.apache.maven.plugins maven-shade-plugin shade-flink none shade-akka package shade false true ${project.basedir}/target/dependency-reduced-pom.xml true com.google.protobuf:* com.google org.apache.flink.akka.shaded.com.google com.google.inject.** {code} was (Author: wenlong.lwl): hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to solve the problem {code} http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> 4.0.0 org.apache.flink flink-parent 1.3-blink-1.1-SNAPSHOT .. flink-shaded-akka-remote flink-shaded-akka-remote jar com.data-artisans flakka-remote_${scala.binary.version} ${akka.version} org.apache.maven.plugins maven-shade-plugin shade-flink none
[jira] [Commented] (FLINK-5989) Protobuf in akka needs to be shaded
[ https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952094#comment-15952094 ] Wenlong Lyu commented on FLINK-5989: hi, [~rmetzger], have you ever try add a module like flink-shade-akka-remote to solve the problem {code} http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> 4.0.0 org.apache.flink flink-parent 1.3-blink-1.1-SNAPSHOT .. flink-shaded-akka-remote flink-shaded-akka-remote jar com.data-artisans flakka-remote_${scala.binary.version} ${akka.version} org.apache.maven.plugins maven-shade-plugin shade-flink none shade-akka package shade false true ${project.basedir}/target/dependency-reduced-pom.xml true com.google.protobuf:* com.google org.apache.flink.akka.shaded.com.google com.google.inject.** {/code} > Protobuf in akka needs to be shaded > --- > > Key: FLINK-5989 > URL: https://issues.apache.org/jira/browse/FLINK-5989 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenlong Lyu > > Currently akka introduces dependency on protobuf, which is a common jar used > in many systems, I think we need to use a shaded akka like what we do in > dependency on hadoop to avoid version conflicts with user code. > {code} > [INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile > [INFO] | \- com.typesafe:config:jar:1.2.1:compile > [INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile > [INFO] | +- io.netty:netty:jar:3.8.0.Final:compile > [INFO] | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile > [INFO] | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5284) Make output of bucketing sink compatible with other processing framework like mapreduce
Wenlong Lyu created FLINK-5284: -- Summary: Make output of bucketing sink compatible with other processing framework like mapreduce Key: FLINK-5284 URL: https://issues.apache.org/jira/browse/FLINK-5284 Project: Flink Issue Type: Improvement Components: filesystem-connector Reporter: Wenlong Lyu Assignee: Wenlong Lyu Currently bucketing sink cannot move the in-progress and pending files to final output when the stream finished, and when recovering, the current output file will contain some invalid content, which can only be identified by the file-length meta file. These make the final output of the job incompatible to other processing framework like mapreduce. There are two things to do to solve the problem: 1. add direct output option to bucketing sink, which writes output to the final file, and delete/truncate the some file when fail over. direct output will be quite useful specially for finite stream job, which can enable user to migrate there batch job to streaming, taking advantage of features such as checkpointing. 2. add truncate by copy option to enable bucketing sink to resize output file by copying content valid in current file instead of creating a length meta file. truncate by copy will make some more extra IO operation, but can make the output more clean. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350396#comment-15350396 ] Wenlong Lyu commented on FLINK-2144: we also consider increment agg for sliding window currently, evicting window must re-compute all the data of the window, no reducing state mechanism is available. > Incremental count, average, and variance for windows > > > Key: FLINK-2144 > URL: https://issues.apache.org/jira/browse/FLINK-2144 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay >Priority: Minor > Labels: statistics > > By count I mean the number of elements in the window. > These can be implemented very efficiently building on FLINK-2143: > Store: O(1) > Evict: O(1) > emitWindow: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2144) Incremental count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350396#comment-15350396 ] Wenlong Lyu edited comment on FLINK-2144 at 6/27/16 7:57 AM: - we are also considering incremental aggregation solution for evicting window currently, evicting window must re-compute all the data of the window, no reducing state mechanism is available. It seems impossible to use fold function to solve the problem. [~aljoscha] was (Author: dragon.l): we also consider increment agg for sliding window currently, evicting window must re-compute all the data of the window, no reducing state mechanism is available. > Incremental count, average, and variance for windows > > > Key: FLINK-2144 > URL: https://issues.apache.org/jira/browse/FLINK-2144 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay >Priority: Minor > Labels: statistics > > By count I mean the number of elements in the window. > These can be implemented very efficiently building on FLINK-2143: > Store: O(1) > Evict: O(1) > emitWindow: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354715#comment-15354715 ] Wenlong Lyu commented on FLINK-2144: our solution is mostly based on this paper: http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf > Incremental count, average, and variance for windows > > > Key: FLINK-2144 > URL: https://issues.apache.org/jira/browse/FLINK-2144 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay >Priority: Minor > Labels: statistics > > By count I mean the number of elements in the window. > These can be implemented very efficiently building on FLINK-2143: > Store: O(1) > Evict: O(1) > emitWindow: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4256) Fine-grained recovery
[ https://issues.apache.org/jira/browse/FLINK-4256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391849#comment-15391849 ] Wenlong Lyu commented on FLINK-4256: hi, Stephan, we have implemented similar solution before, but simple back tracking and forward cannot work well in situation following: Assuming job graph has A/B/C job vertices, A is connected to C with forward strategy, and B is connected to C all-to-all strategy, when a task of A failed, only one C task will be added to restart node set. I suggesting divide the job graph in maximal connected sub-graphs treating the job graph as an undirected graph, when a job graph is submitted. Besides, when the job graph is in large scale because extracting related nodes according to a given node can be time costly and will be repeatedly used in long running, using sub-graphs can avoid the problem > Fine-grained recovery > - > > Key: FLINK-4256 > URL: https://issues.apache.org/jira/browse/FLINK-4256 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > When a task fails during execution, Flink currently resets the entire > execution graph and triggers complete re-execution from the last completed > checkpoint. This is more expensive than just re-executing the failed tasks. > In many cases, more fine-grained recovery is possible. > The full description and design is in the corresponding FLIP. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4256) Fine-grained recovery
[ https://issues.apache.org/jira/browse/FLINK-4256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391849#comment-15391849 ] Wenlong Lyu edited comment on FLINK-4256 at 7/25/16 1:14 PM: - hi, Stephan, we have implemented similar solution before, but simple back tracking and forward cannot work well in situation following: Assuming job graph has A/B/C job vertices, A is connected to C with forward strategy, and B is connected to C all-to-all strategy, when a task of A failed, only one C task will be added to restart node set. I suggesting to treat the job graph as an undirected graph and divide the job graph in maximal connected sub-graphs, when a job graph is submitted. when a task failover, restart the whole sub-graph related. Besides, when the job graph is large because extracting related nodes according to a given node can be time costly and will be repeatedly used in long running, using sub-graphs can avoid the problem was (Author: dragon.l): hi, Stephan, we have implemented similar solution before, but simple back tracking and forward cannot work well in situation following: Assuming job graph has A/B/C job vertices, A is connected to C with forward strategy, and B is connected to C all-to-all strategy, when a task of A failed, only one C task will be added to restart node set. I suggesting divide the job graph in maximal connected sub-graphs treating the job graph as an undirected graph, when a job graph is submitted. Besides, when the job graph is in large scale because extracting related nodes according to a given node can be time costly and will be repeatedly used in long running, using sub-graphs can avoid the problem > Fine-grained recovery > - > > Key: FLINK-4256 > URL: https://issues.apache.org/jira/browse/FLINK-4256 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > When a task fails during execution, Flink currently resets the entire > execution graph and triggers complete re-execution from the last completed > checkpoint. This is more expensive than just re-executing the failed tasks. > In many cases, more fine-grained recovery is possible. > The full description and design is in the corresponding FLIP. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4271) There is no way to set parallelism of operators such as map and window produced by
Wenlong Lyu created FLINK-4271: -- Summary: There is no way to set parallelism of operators such as map and window produced by Key: FLINK-4271 URL: https://issues.apache.org/jira/browse/FLINK-4271 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Wenlong Lyu Currently, CoGroupStreams package the map/keyBy/window operators with a human friendly interface, like: dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the intermediate operators and final window operators can not be accessed by users, and we cannot set attributes of the operators, which make co-group hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-4271: --- Summary: There is no way to set parallelism of operators produced by CoGroupedStreams (was: There is no way to set parallelism of operators such as map and window produced by ) > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397140#comment-15397140 ] Wenlong Lyu commented on FLINK-4271: I think the solution can be better. Not only the parallelism but also other attributes of the operator such as uid, name, etc. should be configurable. Maybe the ```Where``` and ```EqualsTo``` in the ```CoGroupedStreams``` can provide some interface wrappering the same functions provided in the ```SingleOutputStreamOperator```, I am not sure yet~ > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397140#comment-15397140 ] Wenlong Lyu edited comment on FLINK-4271 at 7/28/16 7:05 AM: - I think the solution can be better. Not only the parallelism but also other attributes of the operator such as uid, name, etc. should be configurable. Maybe the _Where_ and _EqualsTo_ in the _CoGroupedStreams_ can provide some interface wrappering the same functions provided in the _SingleOutputStreamOperator_, I am not sure yet~ was (Author: dragon.l): I think the solution can be better. Not only the parallelism but also other attributes of the operator such as uid, name, etc. should be configurable. Maybe the ```Where``` and ```EqualsTo``` in the ```CoGroupedStreams``` can provide some interface wrappering the same functions provided in the ```SingleOutputStreamOperator```, I am not sure yet~ > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4344) Implement new JobManager
[ https://issues.apache.org/jira/browse/FLINK-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-4344: --- Description: This is the parent issue for the efforts to implement the {{JobManager}} changes based on FLIP-6 (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) Because of the breadth of changes, we should implement a new version of the {{JobManager}} (let's call it {{JobMaster}}) rather than updating the current {{JobManager}}. That will allow us to keep a working master branch. At the point when the new cluster management is on par with the current implementation, we will drop the old {{JobManager}} and rename the {{JobMaster}} to {{JobManager}}. was: This is the parent issue for the efforts to implement the {{JobManager}} changes based on FLIP-6 (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) Because of the breadth of changes, we should implement a new version of the {{JobManager}} (let's call it {{JobMaster}}) rather than updating the current {{JobManager}}. That will allow us to keep a working master branch. At the point when the new cluster management is on par with the current implementation, we will drop the old {{JobManager}}and rename the {{JobMaster}} to {{JobManager}}. > Implement new JobManager > > > Key: FLINK-4344 > URL: https://issues.apache.org/jira/browse/FLINK-4344 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Stephan Ewen > > This is the parent issue for the efforts to implement the {{JobManager}} > changes based on FLIP-6 > (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) > Because of the breadth of changes, we should implement a new version of the > {{JobManager}} (let's call it {{JobMaster}}) rather than updating the current > {{JobManager}}. That will allow us to keep a working master branch. > At the point when the new cluster management is on par with the current > implementation, we will drop the old {{JobManager}} and rename the > {{JobMaster}} to {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4375) defined rpc protocols provided by job manager
Wenlong Lyu created FLINK-4375: -- Summary: defined rpc protocols provided by job manager Key: FLINK-4375 URL: https://issues.apache.org/jira/browse/FLINK-4375 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Reporter: Wenlong Lyu Assignee: Wenlong Lyu job manager RPC server needs to implement a job control protocol, resource user protocol, task control protocol, 1. job controller: cancelJob, suspendJob, etc. 2. resource user: slotFailed(notify slot failure), slotAvailable(offer slot), etc. 3. task controller: updateTaskState, updateResultPartitionInfo, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4375) define rpc protocols provided by job manager
[ https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-4375: --- Summary: define rpc protocols provided by job manager (was: defined rpc protocols provided by job manager) > define rpc protocols provided by job manager > > > Key: FLINK-4375 > URL: https://issues.apache.org/jira/browse/FLINK-4375 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > job manager RPC server needs to implement a job control protocol, resource > user protocol, task control protocol, > 1. job controller: cancelJob, suspendJob, etc. > 2. resource user: slotFailed(notify slot failure), > slotAvailable(offer slot), etc. > 3. task controller: updateTaskState, updateResultPartitionInfo, > etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4376) implement job manager init procedure
Wenlong Lyu created FLINK-4376: -- Summary: implement job manager init procedure Key: FLINK-4376 URL: https://issues.apache.org/jira/browse/FLINK-4376 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Wenlong Lyu JobManager should be created with a jobgraph and a resource manager gateway, the init procedure may be as follows: 1. init job manager rpc service and related rpc endpoints 2. try to elect as a leader job manager of the given job graph 3. create the slot pool manager with the given resource manager gateway 4. create the scheduler with the slot pool manager 5. create/restore execution graph from the job graph 6. start execution graph scheduling and run the job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4378) Enable RollingSink to custom HDFS client configuration
Wenlong Lyu created FLINK-4378: -- Summary: Enable RollingSink to custom HDFS client configuration Key: FLINK-4378 URL: https://issues.apache.org/jira/browse/FLINK-4378 Project: Flink Issue Type: Improvement Components: filesystem-connector Reporter: Wenlong Lyu Assignee: Wenlong Lyu Optimizing the configuration of hdfs client in different situation, such as {{io.file.buffer.size}} can make rolling sink perform better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4375) Introduce rpc protocols implemented by job manager
[ https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-4375: --- Summary: Introduce rpc protocols implemented by job manager (was: Introduce rpc protocols provided by job manager) > Introduce rpc protocols implemented by job manager > -- > > Key: FLINK-4375 > URL: https://issues.apache.org/jira/browse/FLINK-4375 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > job manager RPC server needs to implement a job control protocol, resource > user protocol, task control protocol, > 1. job controller: cancelJob, suspendJob, etc. > 2. resource user: slotFailed(notify slot failure), > slotAvailable(offer slot), etc. > 3. task controller: updateTaskState, updateResultPartitionInfo, > etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu reassigned FLINK-6298: -- Assignee: Wenlong Lyu > Local execution is not setting RuntimeContext for RichOutputFormat > -- > > Key: FLINK-6298 > URL: https://issues.apache.org/jira/browse/FLINK-6298 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.1.0, 1.2.0 >Reporter: Mateusz Zakarczemny >Assignee: Wenlong Lyu > > RuntimeContext is never set in RichOutputFormat. I tested it in local > execution. RichMapFunction is setup correctly. > Following code will never print "//Context set in RichOutputFormat" > {code} > import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} > import org.apache.flink.api.common.io.RichOutputFormat > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > object Startup { > def main(args: Array[String]): Unit = { > val mapFunction = new RichMapFunction[String, String] { > def open(taskNumber: Int, numTasks: Int) { getRuntimeContext } > def map(event: String) = { event } > override def setRuntimeContext(t: RuntimeContext) = { > println("//Context set in RichMapFunction") > super.setRuntimeContext(t) > } > } > val outputFormat = new RichOutputFormat[String] { > override def setRuntimeContext(t: RuntimeContext) = { > println("//Context set in RichOutputFormat") > super.setRuntimeContext(t) > } > def open(taskNumber: Int, numTasks: Int) {} > def writeRecord(event: String) { > println(event) > } > def configure(parameters: Configuration): Unit = {} > def close(): Unit = {} > } > val see = StreamExecutionEnvironment.getExecutionEnvironment > val eventsStream = see.fromElements[String]("A", "B", > "C").map(mapFunction) > eventsStream.writeUsingOutputFormat(outputFormat) > see.execute("test-job") > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6298) Local execution is not setting RuntimeContext for RichOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967343#comment-15967343 ] Wenlong Lyu commented on FLINK-6298: It is a bug in all execution environenment, I think we should set runtime context when the output format is a RichOutputFormat. > Local execution is not setting RuntimeContext for RichOutputFormat > -- > > Key: FLINK-6298 > URL: https://issues.apache.org/jira/browse/FLINK-6298 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.1.0, 1.2.0 >Reporter: Mateusz Zakarczemny >Assignee: Wenlong Lyu > > RuntimeContext is never set in RichOutputFormat. I tested it in local > execution. RichMapFunction is setup correctly. > Following code will never print "//Context set in RichOutputFormat" > {code} > import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} > import org.apache.flink.api.common.io.RichOutputFormat > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > object Startup { > def main(args: Array[String]): Unit = { > val mapFunction = new RichMapFunction[String, String] { > def open(taskNumber: Int, numTasks: Int) { getRuntimeContext } > def map(event: String) = { event } > override def setRuntimeContext(t: RuntimeContext) = { > println("//Context set in RichMapFunction") > super.setRuntimeContext(t) > } > } > val outputFormat = new RichOutputFormat[String] { > override def setRuntimeContext(t: RuntimeContext) = { > println("//Context set in RichOutputFormat") > super.setRuntimeContext(t) > } > def open(taskNumber: Int, numTasks: Int) {} > def writeRecord(event: String) { > println(event) > } > def configure(parameters: Configuration): Unit = {} > def close(): Unit = {} > } > val see = StreamExecutionEnvironment.getExecutionEnvironment > val eventsStream = see.fromElements[String]("A", "B", > "C").map(mapFunction) > eventsStream.writeUsingOutputFormat(outputFormat) > see.execute("test-job") > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5815) Add resource files configuration for Yarn Mode
Wenlong Lyu created FLINK-5815: -- Summary: Add resource files configuration for Yarn Mode Key: FLINK-5815 URL: https://issues.apache.org/jira/browse/FLINK-5815 Project: Flink Issue Type: Improvement Components: Client, Distributed Coordination Reporter: Wenlong Lyu Assignee: Wenlong Lyu Currently in flink, when we want to setup a resource file to distributed cache, we need to make the file accessible remotely by a url, which is often difficult to maintain a service like that. What's more, when we want do add some extra jar files to job classpath, we need to copy the jar files to blob server when submitting the jobgraph. In yarn, especially in flip-6, the blob server is not running yet when we try to start a flink job. Yarn has a efficient distributed cache implementation for application running on it, what's more we can be easily share the files stored in hdfs in different application by distributed cache without extra IO operations. I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI to enable yarn user setup their job resource files by yarn distributed cache. The options is compatible with what is used in mapreduce, which make it easy to use for yarn user who generally has experience on using mapreduce. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.
Wenlong Lyu created FLINK-5817: -- Summary: Fix test concurrent execution failure by test dir conflicts. Key: FLINK-5817 URL: https://issues.apache.org/jira/browse/FLINK-5817 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu Assignee: Wenlong Lyu Currently when different users build flink on the same machine, failure may happen because some test utilities create test file using the fixed name, which will cause file access failing when different user processing the same file at the same time. We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.
[ https://issues.apache.org/jira/browse/FLINK-5817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869704#comment-15869704 ] Wenlong Lyu commented on FLINK-5817: ok, It seems good, I will try it instead of adding a random suffix. > Fix test concurrent execution failure by test dir conflicts. > > > Key: FLINK-5817 > URL: https://issues.apache.org/jira/browse/FLINK-5817 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > Currently when different users build flink on the same machine, failure may > happen because some test utilities create test file using the fixed name, > which will cause file access failing when different user processing the same > file at the same time. > We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5817) Fix test concurrent execution failure by test dir conflicts.
[ https://issues.apache.org/jira/browse/FLINK-5817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871461#comment-15871461 ] Wenlong Lyu commented on FLINK-5817: [~shijinkui] I think the reason why you want to change the java.io.tmp property is to solve the ut problem. But I think the best way is to avoid UTs depends on the system property, which will make building more clear. And when we make it, where is no need to set java.io.tmp by pom any more. > Fix test concurrent execution failure by test dir conflicts. > > > Key: FLINK-5817 > URL: https://issues.apache.org/jira/browse/FLINK-5817 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > Currently when different users build flink on the same machine, failure may > happen because some test utilities create test file using the fixed name, > which will cause file access failing when different user processing the same > file at the same time. > We have found errors from AbstractTestBase, IOManagerTest, FileCacheTest. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5815) Add resource files configuration for Yarn Mode
[ https://issues.apache.org/jira/browse/FLINK-5815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874104#comment-15874104 ] Wenlong Lyu commented on FLINK-5815: Hi, till, we have finished an implementation. I was just blocked by some other work last week, I think I can submit the pull request soon this week. > Add resource files configuration for Yarn Mode > -- > > Key: FLINK-5815 > URL: https://issues.apache.org/jira/browse/FLINK-5815 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > Currently in flink, when we want to setup a resource file to distributed > cache, we need to make the file accessible remotely by a url, which is often > difficult to maintain a service like that. What's more, when we want do add > some extra jar files to job classpath, we need to copy the jar files to blob > server when submitting the jobgraph. In yarn, especially in flip-6, the blob > server is not running yet when we try to start a flink job. > Yarn has a efficient distributed cache implementation for application running > on it, what's more we can be easily share the files stored in hdfs in > different application by distributed cache without extra IO operations. > I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI > to enable yarn user setup their job resource files by yarn distributed cache. > The options is compatible with what is used in mapreduce, which make it easy > to use for yarn user who generally has experience on using mapreduce. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-4378) Enable RollingSink to custom HDFS client configuration
[ https://issues.apache.org/jira/browse/FLINK-4378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu closed FLINK-4378. -- fixed > Enable RollingSink to custom HDFS client configuration > -- > > Key: FLINK-4378 > URL: https://issues.apache.org/jira/browse/FLINK-4378 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > Fix For: 1.2.0 > > > Optimizing the configuration of hdfs client in different situation, such as > {{io.file.buffer.size}} can make rolling sink perform better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-4443) Add support in RpcCompletenessTest for inheritance of RpcGateway and RpcEndpoint
[ https://issues.apache.org/jira/browse/FLINK-4443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu closed FLINK-4443. -- fixed > Add support in RpcCompletenessTest for inheritance of RpcGateway and > RpcEndpoint > > > Key: FLINK-4443 > URL: https://issues.apache.org/jira/browse/FLINK-4443 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > RpcCompletenessTest needs to support RpcGateway which is composited by some > basic functions like the example following: > {code:java} > public interface ExecutionStateListener extends RpcGateway { > public void notifyExecutionStateChanges(); > } > public interface JobStateListener extends RpcGateway { > public void notifyJobStateChanges(); > } > public interface JobWatcher extends ExecutionStateListener, JobStateListener, > RpcGateway { > } > public class JobWatcherEndpoint extends RpcEndpoint { > protected JobWatcherEndpoint(RpcService rpcService) { > super(rpcService); > } > @RpcMethod > public void notifyExecutionStateChanges() { > } > @RpcMethod > public void notifyJobStateChanges() { > } > } > public class AttachedJobClient extends JobWatcherEndpoint { > protected JobClient(RpcService rpcService) { > super(rpcService); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5989) Protobuf in akka needs to be shaded
Wenlong Lyu created FLINK-5989: -- Summary: Protobuf in akka needs to be shaded Key: FLINK-5989 URL: https://issues.apache.org/jira/browse/FLINK-5989 Project: Flink Issue Type: Improvement Components: Build System Reporter: Wenlong Lyu Currently akka introduces dependency on protobuf, which is a common jar used in many systems, I think we need to use a shaded akka like what we do in dependency on hadoop to avoid version conflicts with user code. {code} [INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile [INFO] | \- com.typesafe:config:jar:1.2.1:compile [INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile [INFO] | +- io.netty:netty:jar:3.8.0.Final:compile [INFO] | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5989) Protobuf in akka needs to be shaded
[ https://issues.apache.org/jira/browse/FLINK-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900651#comment-15900651 ] Wenlong Lyu commented on FLINK-5989: [~mxm] I found that you change the dependency on akka to a data-artisan custom version. Can you add shade setting in the custom akka project?Or we need to add a flink-shaded-akka module? > Protobuf in akka needs to be shaded > --- > > Key: FLINK-5989 > URL: https://issues.apache.org/jira/browse/FLINK-5989 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Wenlong Lyu > > Currently akka introduces dependency on protobuf, which is a common jar used > in many systems, I think we need to use a shaded akka like what we do in > dependency on hadoop to avoid version conflicts with user code. > {code} > [INFO] +- com.data-artisans:flakka-actor_2.10:jar:2.3-custom:compile > [INFO] | \- com.typesafe:config:jar:1.2.1:compile > [INFO] +- com.data-artisans:flakka-remote_2.10:jar:2.3-custom:compile > [INFO] | +- io.netty:netty:jar:3.8.0.Final:compile > [INFO] | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile > [INFO] | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5749) Unset HADOOP_HOME and HADOOP_CONF_DIR from system environment when building flink
Wenlong Lyu created FLINK-5749: -- Summary: Unset HADOOP_HOME and HADOOP_CONF_DIR from system environment when building flink Key: FLINK-5749 URL: https://issues.apache.org/jira/browse/FLINK-5749 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu Assignee: Wenlong Lyu Currently when we are trying to build flink on a machine with HADOOP_HOME environment variable set, Test data will be written to HDFS, instead of local tmp dir which is expected. This will cause tests failed. I suggest unset HADOOP_HOME and HADOOP_CONF_DIR environment variable in pom, to make sure maven run the test cases in the exactly way we want. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5749) unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build machine failing the UT and IT
[ https://issues.apache.org/jira/browse/FLINK-5749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-5749: --- Summary: unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build machine failing the UT and IT (was: Unset HADOOP_HOME and HADOOP_CONF_DIR from system environment when building flink) > unset HADOOP_HOME and HADOOP_CONF_DIR to avoid env in build machine > failing the UT and IT > - > > Key: FLINK-5749 > URL: https://issues.apache.org/jira/browse/FLINK-5749 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > Currently when we are trying to build flink on a machine with HADOOP_HOME > environment variable set, Test data will be written to HDFS, instead of local > tmp dir which is expected. This will cause tests failed. > I suggest unset HADOOP_HOME and HADOOP_CONF_DIR environment variable in pom, > to make sure maven run the test cases in the exactly way we want. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-16864) Add idle metrics for Task
Wenlong Lyu created FLINK-16864: --- Summary: Add idle metrics for Task Key: FLINK-16864 URL: https://issues.apache.org/jira/browse/FLINK-16864 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Wenlong Lyu Currently there is no metric for user to measure how busy a task is concretely, which is important for user to decide how to tune a job. We would like to propose adding an IdleTime which measure idle time of a task including the time cost for mail processor to wait for new mail and the time cost in record writer to waiting a new buffer. With the idle time: 1. when a job can not catch up with the speed of data generating, the vertex which idle time is near to zero is the bottle neck of the job. 2. when a job is not busy, idle time can be used to guide user how much he can scale down the job. In addition, measuring idle time can have little impaction on the performance of the job, because when a task is busy, we don't touch the code to measure wait-time in mailbox. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17012) Expose cost of task initialization
Wenlong Lyu created FLINK-17012: --- Summary: Expose cost of task initialization Key: FLINK-17012 URL: https://issues.apache.org/jira/browse/FLINK-17012 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Wenlong Lyu Currently a task switches to running before fully initialized, does not take state initialization and operator initialization(#open ) in to account, which may take long time to finish. As a result, there would be a weird phenomenon that all tasks are running but throughput is 0. I think it could be good if we can expose the initialization stage of tasks. What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-17012: Summary: Expose stage of task initialization (was: Expose cost of task initialization) > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077193#comment-17077193 ] Wenlong Lyu commented on FLINK-17012: - [~chesnay] for debugging, logging maybe enough, but it would be hard to integrated with other systems. [~pnowojski] Idle Time currently is designed as a Meter, it is not good to set a meter to be -1 or MAX_VALUE. > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17040) SavepointWriterITCase broken
Wenlong Lyu created FLINK-17040: --- Summary: SavepointWriterITCase broken Key: FLINK-17040 URL: https://issues.apache.org/jira/browse/FLINK-17040 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu I think it is because of the change of flink-16537 which create partition writer in beforeInvoke [~zhijiang] Caused by: java.lang.UnsupportedOperationException: This method should never be called at org.apache.flink.state.api.runtime.SavepointEnvironment.getAllWriters(SavepointEnvironment.java:242) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:439) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:454) at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21010) Separete the implementation of StreamExecJoin
Wenlong Lyu created FLINK-21010: --- Summary: Separete the implementation of StreamExecJoin Key: FLINK-21010 URL: https://issues.apache.org/jira/browse/FLINK-21010 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Wenlong Lyu Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21011) Separete the implementation of StreamExecJoin
Wenlong Lyu created FLINK-21011: --- Summary: Separete the implementation of StreamExecJoin Key: FLINK-21011 URL: https://issues.apache.org/jira/browse/FLINK-21011 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Wenlong Lyu Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-21011) Separete the implementation of StreamExecJoin
[ https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu closed FLINK-21011. --- Resolution: Duplicate > Separete the implementation of StreamExecJoin > - > > Key: FLINK-21011 > URL: https://issues.apache.org/jira/browse/FLINK-21011 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Priority: Major > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21011) Separate the implementation of StreamExecIntervalJoin
[ https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21011: Summary: Separate the implementation of StreamExecIntervalJoin (was: Separete the implementation of StreamExecJoin) > Separate the implementation of StreamExecIntervalJoin > - > > Key: FLINK-21011 > URL: https://issues.apache.org/jira/browse/FLINK-21011 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Priority: Major > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (FLINK-21011) Separate the implementation of StreamExecIntervalJoin
[ https://issues.apache.org/jira/browse/FLINK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu reopened FLINK-21011: - > Separate the implementation of StreamExecIntervalJoin > - > > Key: FLINK-21011 > URL: https://issues.apache.org/jira/browse/FLINK-21011 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Priority: Major > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-5815) Add resource files configuration for Yarn Mode
[ https://issues.apache.org/jira/browse/FLINK-5815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu closed FLINK-5815. -- Resolution: Abandoned > Add resource files configuration for Yarn Mode > -- > > Key: FLINK-5815 > URL: https://issues.apache.org/jira/browse/FLINK-5815 > Project: Flink > Issue Type: New Feature > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently in flink, when we want to setup a resource file to distributed > cache, we need to make the file accessible remotely by a url, which is often > difficult to maintain a service like that. What's more, when we want do add > some extra jar files to job classpath, we need to copy the jar files to blob > server when submitting the jobgraph. In yarn, especially in flip-6, the blob > server is not running yet when we try to start a flink job. > Yarn has a efficient distributed cache implementation for application running > on it, what's more we can be easily share the files stored in hdfs in > different application by distributed cache without extra IO operations. > I suggest to introduce -yfiles, -ylibjars -yarchives options to FlinkYarnCLI > to enable yarn user setup their job resource files by yarn distributed cache. > The options is compatible with what is used in mapreduce, which make it easy > to use for yarn user who generally has experience on using mapreduce. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
Wenlong Lyu created FLINK-8200: -- Summary: RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name Key: FLINK-8200 URL: https://issues.apache.org/jira/browse/FLINK-8200 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu The following case failed when different user run the test in the same machine. Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< FAILURE! - in org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest) Time elapsed: 0.023 sec <<< ERROR! java.io.IOException: No local storage directories available. Local DB files directory 'file:/tmp/foobar' does not exist and cannot be created. at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300) at org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
[ https://issues.apache.org/jira/browse/FLINK-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu reassigned FLINK-8200: -- Assignee: Wenlong Lyu > RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name > - > > Key: FLINK-8200 > URL: https://issues.apache.org/jira/browse/FLINK-8200 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > The following case failed when different user run the test in the same > machine. > Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< > FAILURE! - in > org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest > testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest) > Time elapsed: 0.023 sec <<< ERROR! > java.io.IOException: No local storage directories available. Local DB files > directory 'file:/tmp/foobar' does not exist and cannot be created. > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300) > at > org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8201) YarnResourceManagerTest causes license checking failure
Wenlong Lyu created FLINK-8201: -- Summary: YarnResourceManagerTest causes license checking failure Key: FLINK-8201 URL: https://issues.apache.org/jira/browse/FLINK-8201 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu YarnResourceManagerTest generates a temporary taskmanager config file in flink-yarn module root folder and never clear it, which makes license checking fail when we run {{mvn clean verify}} multiple times in the same source folder. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8201) YarnResourceManagerTest causes license checking failure
[ https://issues.apache.org/jira/browse/FLINK-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu reassigned FLINK-8201: -- Assignee: Wenlong Lyu > YarnResourceManagerTest causes license checking failure > --- > > Key: FLINK-8201 > URL: https://issues.apache.org/jira/browse/FLINK-8201 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > YarnResourceManagerTest generates a temporary taskmanager config file in > flink-yarn module > root folder and never clear it, which makes license checking fail when we > run {{mvn clean verify}} multiple times in the same source folder. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018086#comment-17018086 ] Wenlong Lyu commented on FLINK-15635: - hi, [~twalthr], I would like to share some more classloading issues to solve in long run: # A sql platform product needs to provide as many connectors as possible for the convenience of the users. It is not good to load them all in a single job, which would waste a lot of resources and take long to deploy the job. # There can be different connectors for a kind of storage in different versions in the platform, which can have api or class conflicts, we can not just load them together in a job, and shading the class can solve some of the conflicts but not all. # Catalog is the storage of table and function definition, but currently, Catalog does not care about the related classloading when we need to get a table from it. And my some intuitive thought: # Something like plugin in flink-core needs to be introduced to support dynamic class loading for user defined connectors. # Catalog can be responsible for providing jars for tables it provides. By default, we can put connector jars in /opt/connectors/XXX in flink distribution and catalog can search jars needed from the directories according to the factories matched result. # table env should be able to adding jars from catalog to job graph when submitting a job to a flink cluster. What do you think? > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018617#comment-17018617 ] Wenlong Lyu edited comment on FLINK-15635 at 1/18/20 3:40 PM: -- Thanks [~twalthr] for the explanation. I came to this issue from FLINK-15552, and thought that this issue is used to track long-term solution for class loading of sql client and table. I totally agree with you that we can have a mid-term fix, and introduce more improvements in the future versions. For mid-term fix, +1 to add an explicit common classloader to replace the need of thread context classloader, which will also make the code more easy to understand. was (Author: wenlong.lwl): [~twalthr] thanks for the explanation. I came to this issue from FLINK-15552, and thought that this issue is used to track long-term solution for class loading of sql client and table. I totally agree with you that we can have a mid-term fix, and introduce more improvements in the future versions. For mid-term fix, +1 to add an explicit common classloader to replace the need of thread context classloader, which will also make the code more easy to understand. > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018617#comment-17018617 ] Wenlong Lyu commented on FLINK-15635: - [~twalthr] thanks for the explanation. I came to this issue from FLINK-15552, and thought that this issue is used to track long-term solution for class loading of sql client and table. I totally agree with you that we can have a mid-term fix, and introduce more improvements in the future versions. For mid-term fix, +1 to add an explicit common classloader to replace the need of thread context classloader, which will also make the code more easy to understand. > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
[ https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180209#comment-17180209 ] Wenlong Lyu commented on FLINK-12351: - Hi, [~jark][~trohrmann] I think we may need to fix this issue in 1.11, it may be a regression for 1.11. Before 1.11, AsyncWaitOperator is not chainnable because of [FLINK-13063|https://issues.apache.org/jira/browse/FLINK-13063], all of input records are new created from network inputs, so this bug would not be triggerred. In 1.11, AsyncWaitOperator is chainnable again([FLINK-16219|https://issues.apache.org/jira/browse/FLINK-16219]), this bug would affect the result when object reuse is enabled. > AsyncWaitOperator should deep copy StreamElement when object reuse is enabled > - > > Key: FLINK-12351 > URL: https://issues.apache.org/jira/browse/FLINK-12351 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, AsyncWaitOperator directly put the input StreamElement into > {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement > is reused, which means the element in {{StreamElementQueue}} will be > modified. As a result, the output of AsyncWaitOperator might be wrong. > An easy way to fix this might be deep copy the input StreamElement when > object reuse is enabled, like this: > https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
[ https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181081#comment-17181081 ] Wenlong Lyu commented on FLINK-12351: - [~pnowojski] As I known, it is easy to get whether the op is the head of chain by: StreamConfig#isChainStart > AsyncWaitOperator should deep copy StreamElement when object reuse is enabled > - > > Key: FLINK-12351 > URL: https://issues.apache.org/jira/browse/FLINK-12351 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, AsyncWaitOperator directly put the input StreamElement into > {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement > is reused, which means the element in {{StreamElementQueue}} will be > modified. As a result, the output of AsyncWaitOperator might be wrong. > An easy way to fix this might be deep copy the input StreamElement when > object reuse is enabled, like this: > https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089260#comment-17089260 ] Wenlong Lyu commented on FLINK-17012: - [~pnowojski] I think whether the task is ready to process data is what users really care. Including the initialization step in the DEPLOYING state of task can be an option. I agrees that we should avoid `initialize()` or `configure()` if possible. Regarding initializing in constructor, I think we would need to do more do more check (at least null check) when clean up in the exception catch clause and it would be impossible clean up externally which may cause resource leak when we trying to cancel a task by interrupting it, because the Invokable is not accessable when failed in constructor, we may need a Factory as [~pnowojski] suggested, which can construct and initialize all components(statebackend, operator chain etc.) , create task by providing them explicitly, and clean them up when necessary. > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089350#comment-17089350 ] Wenlong Lyu commented on FLINK-17313: - hi, all, the ticket is trying to fix the bug of the validation of PhysicalDataType and LogicalDataType of TableSink only, I think it is much more clear and clean, worth to consider to fix. Currently the validation on sink reuses the validation on source while they should be different actually. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractItera
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089382#comment-17089382 ] Wenlong Lyu commented on FLINK-17313: - [~dwysakowicz] I think it is not necessary that the schema of logical schema and physical schema should be matched exactly: Currently, we allow a column in source : logical type varchar(10), while pyshical type is varchar(5), see `CastAvoidanceChecker` used in the compatible check. The requirement on source is that: we need to be able convert a physical record of source to internal record according to physicalDataType and LogicalDataType. On sinks, the requirements should be reversed: we need to be able convert an internal record to a physical record for sink: so we can allow a column of sink whose Logical type is varchar(5) but physical type is varchar(10). On validation: we has an validation to make sure that the schema of source query of a sink match the logical type, the validation between logical type and physical can be much more loose I think. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(Planne
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089472#comment-17089472 ] Wenlong Lyu commented on FLINK-17313: - [~dwysakowicz] Regarding LEGACY DECIMAL, I think it is a special case good to support: the physical presentation of LAGECY DECIMAL is BigDecimal, can support any precision and scale, so allow such conversion will not break anything actually and the final precision and scale of the output BigDecimal still limited by logical data type so no data with error precision will be generated. What's more, with such support we can easily fill up the support of decimal in all kinds of sink with old interface. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreac
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091088#comment-17091088 ] Wenlong Lyu commented on FLINK-17012: - I have come up the same question about init too earlier. I think if we want to do init in constructor, there would be no abstract `init`, every StreamTask should init itself part in its constructor: StreamTask only init the common part of resource such as statebackend and operator, and SourceStreamTask or OneInputStreamTask init it special part in the constructor. > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
Wenlong Lyu created FLINK-17386: --- Summary: Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided. Key: FLINK-17386 URL: https://issues.apache.org/jira/browse/FLINK-17386 Project: Flink Issue Type: Bug Reporter: Wenlong Lyu java.io.IOException: Process execution failed due error. Error output:java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.security.UserGroupInformation\n\tat org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat java.lang.Thread.run(Thread.java:834) I think it is because exception throw in the static code block of UserInformation, we should catch Throwable instead of Exception in HadoopSecurityContextFactory#createContext? [~rongrong] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-17386: Description: java.io.IOException: Process execution failed due error. Error output:java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.security.UserGroupInformation\n\tat org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat java.lang.Thread.run(Thread.java:834) I think it is because exception throw in the static code block of UserInformation, we should catch Throwable instead of Exception in HadoopSecurityContextFactory#createContext? [~rongr] what do you think? was: java.io.IOException: Process execution failed due error. Error output:java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.security.UserGroupInformation\n\tat org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat java.lang.Thread.run(Thread.java:834) I think it is because exception throw in the static code block of UserInformation, we should catch Throwable instead of Exception in HadoopSecurityContextFactory#createContext? [~rongrong] what do you think? > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Priority: Major > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)
[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094097#comment-17094097 ] Wenlong Lyu commented on FLINK-17386: - hi [~rongr], I don't want shaded-hadoop-lib in the lib, because I didn't want to submit job to yarn. I did more investigation on this issue, found that it is because there is a customized state backend in the lib which is designed to support write to hdfs and contains some of hadoop libs but not all. The root cause is that: the security loader only check whether Configuration and UserGroupInformation is in classpath or not, which may cause the exception above when no enough lib is in the lib dir. > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Priority: Major > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094218#comment-17094218 ] Wenlong Lyu commented on FLINK-17386: - yes, you are right about the use case. We are doing some testing based on the master branch, I can do a test on 1.10, and feed back later. > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Priority: Major > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094235#comment-17094235 ] Wenlong Lyu commented on FLINK-17386: - [~rongr] I run a quick test by copy the statebackend to the lib of 1.10 and run a flink command, no error happen. > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Priority: Major > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096232#comment-17096232 ] Wenlong Lyu commented on FLINK-17386: - [~rongr] I have verified the patch, it works. > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079409#comment-17079409 ] Wenlong Lyu commented on FLINK-17012: - [~chesnay] I think it is not a good choice to treat processing first record as the signal of finishing initialization: when there is a filter in a job which filters most of the records or there is no input for long time, we will make a wrong decision. IMO, it would be perfect if we can add a new stage(state) for task such as Initializing which can be easily visualized but it can hurt compatibility, a separate metric maybe a good compromise? > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082008#comment-17082008 ] Wenlong Lyu commented on FLINK-17012: - [~chesnay] If the state is a TaskExecutor-local thing, We still need to a solution to expose such local information to users/external systems. > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17109) FlinkPipelineTranslationUtil#getJobGraph doesn't consider Jars and Classpaths in configuration.
Wenlong Lyu created FLINK-17109: --- Summary: FlinkPipelineTranslationUtil#getJobGraph doesn't consider Jars and Classpaths in configuration. Key: FLINK-17109 URL: https://issues.apache.org/jira/browse/FLINK-17109 Project: Flink Issue Type: Improvement Components: Client / Job Submission Reporter: Wenlong Lyu I think this method should be removed, and we should use PipelineExecutorUtils#getJobGraph instead? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects configuration
Wenlong Lyu created FLINK-17110: --- Summary: Make StreamExecutionEnvironment#configure also affects configuration Key: FLINK-17110 URL: https://issues.apache.org/jira/browse/FLINK-17110 Project: Flink Issue Type: Improvement Reporter: Wenlong Lyu If StreamExecutionEnvironment#configure can also affect the configuration in StreamExecutionEnvironment, we can easily not only add some library jars or classpaths dynamically according to the job we want to run which is quite important for a platform product, but also optimize some runtime configuration in program. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects configuration
[ https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083070#comment-17083070 ] Wenlong Lyu commented on FLINK-17110: - [~aljoscha] I mean configuration in StreamExecutionEnvironment, which will be used to generated the job graph and setup per job cluster I think. [~kkl0u] I propose to merge the configuration provided by the method of StreamExecutionEnvironment#configure to StreamExecutionEnvironment#configuration, so that we can change more configuration of runtime programmatically. In my use case, I am trying to create a tool to support running arbitrary sql jobs in per job cluster mode, which needs to load connector jars according to the sql dynamically. Currently I have to merge all library jars together, which will make it inconvenient to introduce a new connector and easy to cause class conflicts. It would be possible if we can set PipelineOptions.CLASSPATHS programmatically and store the library jars in some http server, which can be configured by StreamExecutionEnvironment#configuration. Another case, it could be also useful to optimize advanced runtime configuration according to job we are running, such as, set a larger resource request timeout for a large job. It could be configured in the commandLine too, but it will be more inconvenient I think. > Make StreamExecutionEnvironment#configure also affects configuration > > > Key: FLINK-17110 > URL: https://issues.apache.org/jira/browse/FLINK-17110 > Project: Flink > Issue Type: Improvement >Reporter: Wenlong Lyu >Priority: Major > > If StreamExecutionEnvironment#configure can also affect the configuration in > StreamExecutionEnvironment, we can easily not only add some library jars or > classpaths dynamically according to the job we want to run which is quite > important for a platform product, but also optimize some runtime > configuration in program. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17139) The sub-plan reuse optimizer can't reuse sub-plan that from different sql node tree
[ https://issues.apache.org/jira/browse/FLINK-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083313#comment-17083313 ] Wenlong Lyu commented on FLINK-17139: - in 1.10 the compile for each dml is eager: StreamTableEnvironmentImpl#isEagerOperationTranslation return true, so we can't reuse the source. I believe that it is solved in master. > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree > > > Key: FLINK-17139 > URL: https://issues.apache.org/jira/browse/FLINK-17139 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: jinfeng >Priority: Major > > The sub-plan reuse optimizer can't reuse sub-plan that from different sql > node tree . > {code:java} > //代码占位符 > create table SourceTable ...; > create table SinkTable1 ; > create table SinkTable2 ; > insert into SinkTable1 select * from SourceTable; > insert into SinkTable2 select * from SourceTable; > {code} > SourceTable will not be reuse when I execute the below sql statements; > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17141) Name of SQL Operator is too long
Wenlong Lyu created FLINK-17141: --- Summary: Name of SQL Operator is too long Key: FLINK-17141 URL: https://issues.apache.org/jira/browse/FLINK-17141 Project: Flink Issue Type: Improvement Reporter: Wenlong Lyu the name of the operator contains the detail logic of the operator, which make it very large when there are a lot of columns. It is a disaster for logging and web ui, also can cost a lot of memory because we use the name widely such as ExecutionVertex and failover message etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects StreamExecutionEnvironment#configuration
[ https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-17110: Summary: Make StreamExecutionEnvironment#configure also affects StreamExecutionEnvironment#configuration (was: Make StreamExecutionEnvironment#configure also affects configuration) > Make StreamExecutionEnvironment#configure also affects > StreamExecutionEnvironment#configuration > --- > > Key: FLINK-17110 > URL: https://issues.apache.org/jira/browse/FLINK-17110 > Project: Flink > Issue Type: Improvement >Reporter: Wenlong Lyu >Priority: Major > > If StreamExecutionEnvironment#configure can also affect the configuration in > StreamExecutionEnvironment, we can easily not only add some library jars or > classpaths dynamically according to the job we want to run which is quite > important for a platform product, but also optimize some runtime > configuration in program. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17110) Make StreamExecutionEnvironment#configure also affects StreamExecutionEnvironment#configuration
[ https://issues.apache.org/jira/browse/FLINK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083957#comment-17083957 ] Wenlong Lyu commented on FLINK-17110: - hi, [~aljoscha][~kkl0u] Do you have any more comments? Here is a patch : https://github.com/wenlong88/flink/commit/651a48ce4d837623e6eacb4814bd1e43c686fdd4, mainly contains two points of change: 1. use PipelineExecutorUtils to generate JobGraph so that the configuration in StreamExecutionEnvironment can affect on JobGraph, unifying the manner on RemoteExecutor, LocalExecutor, ClasspathJobGraphRetriever and web submit handler. 2. make configuration in StreamExecutionEnvironment configurable by StreamExecutionEnvironment#configure, so that user can update the configuration in the program. > Make StreamExecutionEnvironment#configure also affects > StreamExecutionEnvironment#configuration > --- > > Key: FLINK-17110 > URL: https://issues.apache.org/jira/browse/FLINK-17110 > Project: Flink > Issue Type: Improvement >Reporter: Wenlong Lyu >Priority: Major > > If StreamExecutionEnvironment#configure can also affect the configuration in > StreamExecutionEnvironment, we can easily not only add some library jars or > classpaths dynamically according to the job we want to run which is quite > important for a platform product, but also optimize some runtime > configuration in program. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084135#comment-17084135 ] Wenlong Lyu commented on FLINK-17012: - Treating initialization as part of deploying is also a reasonable idea. How about we continue FLINK-1474: add an `initialize()` in AbstractInvokable and call initialize in task before task transfer to running? Currently there are similar functionalities in both StreamTask and BatchTask: StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), the different is that; there are some common initialization in BatchTask#invoke instead of BatchTask#initialize. we may need a bit more refactor for BatchTask > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084135#comment-17084135 ] Wenlong Lyu edited comment on FLINK-17012 at 4/15/20, 2:37 PM: --- Treating initialization as part of deploying is also a reasonable idea. How about we continue FLINK-4714: add an `initialize()` in AbstractInvokable and call initialize in task before task transfer to running? Currently there are similar functionalities in both StreamTask and BatchTask: StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), the different is that; there are some common initialization in BatchTask#invoke instead of BatchTask#initialize. we may need a bit more refactor for BatchTask was (Author: wenlong.lwl): Treating initialization as part of deploying is also a reasonable idea. How about we continue FLINK-1474: add an `initialize()` in AbstractInvokable and call initialize in task before task transfer to running? Currently there are similar functionalities in both StreamTask and BatchTask: StreamTask#beforeInvoke and BatchTask#initialize which is called in invoke(), the different is that; there are some common initialization in BatchTask#invoke instead of BatchTask#initialize. we may need a bit more refactor for BatchTask > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17012) Expose stage of task initialization
[ https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087435#comment-17087435 ] Wenlong Lyu commented on FLINK-17012: - hi, [~sewen] thanks for the explanation. but I think that it may not be good to do much IO and user code initialization(including state intialization and Function#open) in the constructor, it can easily cause exceptions because of failure of external system and user code exception. It would be difficult to do clean up when exception happens in the constructor. > Expose stage of task initialization > --- > > Key: FLINK-17012 > URL: https://issues.apache.org/jira/browse/FLINK-17012 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Reporter: Wenlong Lyu >Priority: Major > > Currently a task switches to running before fully initialized, does not take > state initialization and operator initialization(#open ) in to account, which > may take long time to finish. As a result, there would be a weird phenomenon > that all tasks are running but throughput is 0. > I think it could be good if we can expose the initialization stage of tasks. > What to you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16279) Per job Yarn application leak in normal execution mode.
Wenlong Lyu created FLINK-16279: --- Summary: Per job Yarn application leak in normal execution mode. Key: FLINK-16279 URL: https://issues.apache.org/jira/browse/FLINK-16279 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.10.0 Reporter: Wenlong Lyu I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed. After some research on the code, I found that: when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} before received a request from job client. {code} if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully jobResultFuture.thenAccept((JobResult result) -> { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } {code} However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16279) Per job Yarn application leak in normal execution mode.
[ https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-16279: Description: I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed. After some research on the code, I found that: when running in attached mode, MiniDispatcher will never set {{shutDownfuture}} before received a request from job client. {code} if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully jobResultFuture.thenAccept((JobResult result) -> { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } {code} However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again. was: I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed. After some research on the code, I found that: when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} before received a request from job client. {code} if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully jobResultFuture.thenAccept((JobResult result) -> { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } {code} However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again. > Per job Yarn application leak in normal execution mode. > --- > > Key: FLINK-16279 > URL: https://issues.apache.org/jira/browse/FLINK-16279 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: Wenlong Lyu >Priority: Major > > I run a job in yarn per job mode using {{env.executeAsync}}, the job failed > but the yarn cluster didn't be destroyed. > After some research on the code, I found that: > when running in attached mode, MiniDispatcher will never set > {{shutDownfuture}} before received a request from job client. > {code} > if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { > // terminate the MiniDispatcher once we served the > first JobResult successfully > jobResultFuture.thenAccept((JobResult result) -> { > ApplicationStatus status = > result.getSerializedThrowable().isPresent() ? > ApplicationStatus.FAILED : > ApplicationStatus.SUCCEEDED; > LOG.debug("Shutting down per-job cluster > because someone retrieved the job result."); > shutDownFuture.complete(status); > }); > } > {code} > However, when running in async mode(submit job by env.executeAsync), there > may be no request from job client because when a user find that the job is > failed from job client, he may never request the result again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21870) RelDataType of TableFunction different in local and CI
Wenlong Lyu created FLINK-21870: --- Summary: RelDataType of TableFunction different in local and CI Key: FLINK-21870 URL: https://issues.apache.org/jira/browse/FLINK-21870 Project: Flink Issue Type: Sub-task Reporter: Wenlong Lyu Locally the StructKind of RelDataType is None, however in CI environment, the StructKind is FULL_QUOLIFIED, see the case in CorrelateJsonPlanTest#testRegisterByClass -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde
[ https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21785: Affects Version/s: 1.13.0 > Support StreamExecCorrelate json serde > -- > > Key: FLINK-21785 > URL: https://issues.apache.org/jira/browse/FLINK-21785 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.0 >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde
[ https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21785: Component/s: Table SQL / Planner > Support StreamExecCorrelate json serde > -- > > Key: FLINK-21785 > URL: https://issues.apache.org/jira/browse/FLINK-21785 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde
[ https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21785: Fix Version/s: 1.13.0 > Support StreamExecCorrelate json serde > -- > > Key: FLINK-21785 > URL: https://issues.apache.org/jira/browse/FLINK-21785 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21785) Support StreamExecCorrelate json serde
[ https://issues.apache.org/jira/browse/FLINK-21785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21785: Affects Version/s: (was: 1.13.0) > Support StreamExecCorrelate json serde > -- > > Key: FLINK-21785 > URL: https://issues.apache.org/jira/browse/FLINK-21785 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21813) Support StreamExecOverAggregate json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21813: Component/s: Table SQL / Planner > Support StreamExecOverAggregate json ser/de > --- > > Key: FLINK-21813 > URL: https://issues.apache.org/jira/browse/FLINK-21813 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21816) Support StreamExecMatch json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21816: Fix Version/s: 1.13.0 > Support StreamExecMatch json ser/de > --- > > Key: FLINK-21816 > URL: https://issues.apache.org/jira/browse/FLINK-21816 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21813) Support StreamExecOverAggregate json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21813: Fix Version/s: 1.13.0 > Support StreamExecOverAggregate json ser/de > --- > > Key: FLINK-21813 > URL: https://issues.apache.org/jira/browse/FLINK-21813 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21816) Support StreamExecMatch json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-21816: Component/s: Table SQL / Planner > Support StreamExecMatch json ser/de > --- > > Key: FLINK-21816 > URL: https://issues.apache.org/jira/browse/FLINK-21816 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment
Wenlong Lyu created FLINK-22020: --- Summary: Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment Key: FLINK-22020 URL: https://issues.apache.org/jira/browse/FLINK-22020 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.0 Reporter: Wenlong Lyu -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment
[ https://issues.apache.org/jira/browse/FLINK-22020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-22020: Attachment: error_log.png > Reflections ClassNotFound when trying to deserialize a json plan in > deployment envrionment > --- > > Key: FLINK-22020 > URL: https://issues.apache.org/jira/browse/FLINK-22020 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Wenlong Lyu >Priority: Major > Attachments: error_log.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22020) Reflections ClassNotFound when trying to deserialize a json plan in deployment envrionment
[ https://issues.apache.org/jira/browse/FLINK-22020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenlong Lyu updated FLINK-22020: Description: after some investigation, I found that it is because we can don't include org.reflections in pom of flink-table-planner-blink > Reflections ClassNotFound when trying to deserialize a json plan in > deployment envrionment > --- > > Key: FLINK-22020 > URL: https://issues.apache.org/jira/browse/FLINK-22020 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Wenlong Lyu >Priority: Major > Attachments: error_log.png > > > after some investigation, I found that it is because we can don't include > org.reflections in pom of flink-table-planner-blink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23159) Correlated sql subquery on the source created via fromValues() failed to compile
[ https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373308#comment-17373308 ] Wenlong Lyu commented on FLINK-23159: - [~gaoyunhaii] Thanks for reporting the issue, the root cause of this issue is that currently Values is excluded in SubqueryDecorrelator, I would try to fix it. > Correlated sql subquery on the source created via fromValues() failed to > compile > > > Key: FLINK-23159 > URL: https://issues.apache.org/jira/browse/FLINK-23159 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Yun Gao >Priority: Major > > Correlated subquery like > {code:java} > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableEnvironment; > import org.apache.flink.table.types.DataType; > import org.apache.flink.types.Row; > import java.util.ArrayList; > import java.util.List; > public class SQLQueryTest { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode() > .build(); > TableEnvironment tableEnvironment = TableEnvironment.create(settings); > DataType row = DataTypes.ROW( > DataTypes.FIELD("flag", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("name", DataTypes.STRING()) > ); > Table table = tableEnvironment.fromValues(row, new > MyListSource("table1").builder()); > tableEnvironment.createTemporaryView("table1", table); > table = tableEnvironment.fromValues(row, new > MyListSource("table2").builder()); > tableEnvironment.createTemporaryView("table2", table); > String sql = "select t1.flag from table1 t1 where t1.name in (select > t2.name from table2 t2 where t2.id = t1.id)"; > tableEnvironment.explainSql(sql); > } > public static class MyListSource { > private String flag; > public MyListSource(String flag) { > this.flag = flag; > } > public List builder() { > List rows = new ArrayList<>(); > for (int i = 2; i < 3; i++) { > Row row = new Row(3); > row.setField(0, flag); > row.setField(1, i); > row.setField(2, "me"); > rows.add(row); > } > return rows; > } > } > } > {code} > would throws > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: > unexpected correlate variable $cor0 in the plan > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at >
[jira] [Commented] (FLINK-23385) Fix nullability of COALESCE
[ https://issues.apache.org/jira/browse/FLINK-23385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382009#comment-17382009 ] Wenlong Lyu commented on FLINK-23385: - hi, [~twalthr] I think the root cause of the issue is the return type of REGEXP_EXTRACT should be force nullable instead of depending on input type, introduce by FLINK-13783 > Fix nullability of COALESCE > --- > > Key: FLINK-23385 > URL: https://issues.apache.org/jira/browse/FLINK-23385 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.1 >Reporter: Maciej Bryński >Priority: Major > > EDIT: Simpler case: > {code:java} > SELECT COALESCE(REGEXP_EXTRACT('22','[A-Z]+'),'-'); > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, > however, a null value is being written into it. You can set job configuration > 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and > drop such records silently. > {code} > When using REGEXP_EXTRACT on NOT NULL column I'm getting following exception > {code:java} > select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test limit 10 > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, > however, a null value is being written into it. You can set job configuration > 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and > drop such records silently. > {code} > I think the reason is that nullability of result is wrongly calculated. > Example: > {code:java} > create table test ( > test STRING NOT NULL > ) WITH ( > 'connector' = 'datagen' > ); > explain select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test > == Abstract Syntax Tree == > LogicalProject(EXPR$0=[REGEXP_EXTRACT($0, _UTF-16LE'[A-Z]+')]) > +- LogicalTableScan(table=[[default_catalog, default_database, test]])== > Optimized Physical Plan == > Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0]) > +- TableSourceScan(table=[[default_catalog, default_database, test]], > fields=[test])== Optimized Execution Plan == > Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0]) > +- TableSourceScan(table=[[default_catalog, default_database, test]], > fields=[test]){code} > As you can see Flink is removing COALESCE from query which is wrong. > > Same for view (null = false): > {code:java} > create view v as select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from > test > describe v; > +++---+-++---+ > | name | type | null | key | extras | watermark | > +++---+-++---+ > | EXPR$0 | STRING | false | || | > +++---+-++---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)