[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion
shuai.xu created FLINK-8442: --- Summary: Should recovery the input split when an execution failover with FailoverRegion Key: FLINK-8442 URL: https://issues.apache.org/jira/browse/FLINK-8442 Project: Flink Issue Type: Bug Components: JobManager, Scheduler Affects Versions: 1.4.0 Reporter: shuai.xu In flip-1, it enable only restart the executions in a FailoverRegion when a task fail. But now the input splits are assigned only when an ExecutionJobVertex is initializing, so when an executions restarts, the input splits it has read may can not be get from job master any more. Need to recover the input splits so they can be be consumed again when the task restarts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8443) YARNSessionCapacitySchedulerITCase is flakky
Piotr Nowojski created FLINK-8443: - Summary: YARNSessionCapacitySchedulerITCase is flakky Key: FLINK-8443 URL: https://issues.apache.org/jira/browse/FLINK-8443 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.5.0 Reporter: Piotr Nowojski Attachments: 35.5.tar.gz Attached build logs from travis. Test(s) is failing with: {noformat} java.lang.AssertionError: Found a file /home/travis/build/dataArtisans/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-1_0/application_1516120275777_0003/container_1516120275 777_0003_01_02/taskmanager.log with a prohibited string (one of [Exception, Started SelectChannelConnector@0.0.0.0:8081]). Excerpts{noformat} After downloading the yarn logs uploaded to transfer.sh there is a following failure: {code:java} 2018-01-16 16:32:10,553 INFO org.apache.flink.yarn.YarnTaskManager - Stopping TaskManager with final application status SUCCEEDED and diagnostics: Flink YARN Client requested shutdown 2018-01-16 16:32:10,577 INFO org.apache.flink.yarn.YarnTaskManager - Stopping TaskManager akka://flink/user/taskmanager#2122015748. 2018-01-16 16:32:10,578 INFO org.apache.flink.yarn.YarnTaskManager - Disassociating from JobManager 2018-01-16 16:32:10,588 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache 2018-01-16 16:32:10,599 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache 2018-01-16 16:32:10,614 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /home/travis/build/dataArtisans/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-localDir-nm-1_0/usercache/travis/appcache/application_1516120275777_0003/flink-io-356a7c21-a3cd-43cb-926c-7690f861b66c 2018-01-16 16:32:10,615 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components. 2018-01-16 16:32:10,619 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful shutdown (took 4 ms). 2018-01-16 16:32:10,623 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful shutdown (took 4 ms). 2018-01-16 16:32:10,641 INFO org.apache.flink.yarn.YarnTaskManager - Task manager akka://flink/user/taskmanager is completely shut down. 2018-01-16 16:32:10,649 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2018-01-16 16:32:10,650 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2018-01-16 16:32:10,717 WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline - An exception was thrown by an exception handler. java.util.concurrent.RejectedExecutionException: Worker has already been shutdown at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636) at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) at org.apache.flink.shaded.akka.org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658) at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781) at org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline
[jira] [Created] (FLINK-8444) Rework dependency setup docs
Chesnay Schepler created FLINK-8444: --- Summary: Rework dependency setup docs Key: FLINK-8444 URL: https://issues.apache.org/jira/browse/FLINK-8444 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.4.0, 1.5.0 Reporter: Chesnay Schepler Taken from https://github.com/apache/flink/pull/5303: {quote} I would suggest to start thinking about the dependencies the following way: There are pure user-code projects where the Flink runtime is "provided" and they are started using an existing Flink setup (bin/flink run or REST entry point). This is the Framework Style. In the future, we will have "Flink as a Library" deployments, where users add something like flink-dist as a library to their program and then simply dockerize that Java application. Code can be run in the IDE or other similar style embedded forms. This is in some sense also a "Flink as a Library" deployment, but with selective (fewer) dependencies. The RocksDB issue applies only to this scenario here. To make this simpler for the users, it would be great to have not N different models that we talk about, but ideally only two: Framework Style and Library Style. We could for example start to advocate and document that users should always use flink-dist as their standard dependency - "provided" in the framework style deployment, "compile" in the library style deployment. That might be a really easy way to work with that. The only problem for the time being is that flink-dist is quite big and contains for example also optional dependencies like flink-table, which makes it more heavyweight for quickstarts. Maybe we can accept that as a trade-off for dependency simplicity. {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent
Chris Thomson created FLINK-8445: Summary: hostname used in metric names for taskmanager and jobmanager are not consistent Key: FLINK-8445 URL: https://issues.apache.org/jira/browse/FLINK-8445 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.3.1 Environment: I think that this problem is present for metrics reporting enabled configurations that include '' as part of the scope for the metrics. For example, using Graphite reporting configuration in flink-conf.yaml below: {code:java} metrics.scope.jm: flink..jobmanager metrics.scope.jm.job: flink..jobmanager. metrics.scope.tm: flink..taskmanager metrics.scope.tm.job: flink..taskmanager. metrics.scope.task: flink..taskmanager... metrics.scope.operator: flink..taskmanager... metrics.reporters: graphite metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporter ...{code} Reporter: Chris Thomson Enabled Flink metrics reporting using Graphite using system scopes that contain '' for both the job manager and task manager. The resulting metrics reported to Graphite use two different representations for ''. For *Task Manager metrics* it uses the *short hostname* (without the DNS domain). This is a result of logic in org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that tries to extract the short hostname from the fully qualified domain name looked up from InetAddress.getCanonicalHostName(). For *Job Manager metrics* it uses the *fully qualified domain name* (with the DNS domain). This is a result of there being no logic in org.apache.flink.runtime.jobmanager.JobManagerRunner or org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent normalization of the fully qualified domain name down to the short hostname. Ideally the '' placeholders in the system scopes for the job manager and task manager related metrics would be replaced with a consistent value (either the short hostname or the fully qualified domain name). Even better if there was a configuration option to decide which one should be used for metric name generation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8446) Add support for multiple broadcast states.
Kostas Kloudas created FLINK-8446: - Summary: Add support for multiple broadcast states. Key: FLINK-8446 URL: https://issues.apache.org/jira/browse/FLINK-8446 Project: Flink Issue Type: New Feature Components: DataStream API Affects Versions: 1.4.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.5.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
adding a new cloud filesystem
Hi, I'm adding support for more cloud storage providers such as Google (gcs://) and Oracle (oci://). I have an oci:// test working based on the s3a:// test but when I try it on an actual Flink job like WordCount, I get this message: "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oci'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded." How do I register new schemes into the file system factory? Thanks.On Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k wrote: Hi, question on this page: "You need to point Flink to a valid Hadoop configuration..."https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#s3-simple-storage-service How do you point Flink to the Hadoop config? On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann wrote: Hi, the flink-connector-filesystem contains the BucketingSink which is a connector with which you can write your data to a file system. It provides exactly once processing guarantees and allows to write data to different buckets [1]. The flink-filesystem module contains different file system implementations (like mapr fs, hdfs or s3). If you want to use, for example, s3 file system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module. So if you want to write your data to s3 using the BucketingSink, then you have to add flink-connector-filesystem for the BucketingSink as well as a s3 file system implementations (e.g. flink-s3-fs-hadoop or flink-s3-fs-presto). Usually, there should be no need to change Flink's filesystem implementations. If you want to add a new connector, then this would go to flink-connectors or to Apache Bahir [2]. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/index.html#connectors-in-apache-bahir Cheers, Till On Fri, Jan 12, 2018 at 7:22 PM, cw7k wrote: > Hi, I'm trying to understand the difference between the flink-filesystem > and flink-connector-filesystem. How is each intended to be used? > If adding support for a different storage provider that supports HDFS, > should additions be made to one or the other, or both? Thanks.
Re: adding a new cloud filesystem
Hi, please have a look at this doc page [1]. It describes how to add new file system implementations and also how to configure them. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#adding-new-file-system-implementations 2018-01-18 0:32 GMT+01:00 cw7k : > Hi, I'm adding support for more cloud storage providers such as Google > (gcs://) and Oracle (oci://). > I have an oci:// test working based on the s3a:// test but when I try it > on an actual Flink job like WordCount, I get this message: > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 'oci'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded." > How do I register new schemes into the file system factory? Thanks.On > Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k > wrote: > > Hi, question on this page: > "You need to point Flink to a valid Hadoop configuration..."https://ci. > apache.org/projects/flink/flink-docs-release-1.4/ops/ > deployment/aws.html#s3-simple-storage-service > How do you point Flink to the Hadoop config? > On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann < > trohrm...@apache.org> wrote: > > Hi, > > the flink-connector-filesystem contains the BucketingSink which is a > connector with which you can write your data to a file system. It provides > exactly once processing guarantees and allows to write data to different > buckets [1]. > > The flink-filesystem module contains different file system implementations > (like mapr fs, hdfs or s3). If you want to use, for example, s3 file > system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module. > > So if you want to write your data to s3 using the BucketingSink, then you > have to add flink-connector-filesystem for the BucketingSink as well as a > s3 file system implementations (e.g. flink-s3-fs-hadoop or > flink-s3-fs-presto). > > Usually, there should be no need to change Flink's filesystem > implementations. If you want to add a new connector, then this would go to > flink-connectors or to Apache Bahir [2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/ > filesystem_sink.html > > [2] > https://ci.apache.org/projects/flink/flink-docs- > master/dev/connectors/index.html#connectors-in-apache-bahir > > Cheers, > Till > > On Fri, Jan 12, 2018 at 7:22 PM, cw7k wrote: > > > Hi, I'm trying to understand the difference between the flink-filesystem > > and flink-connector-filesystem. How is each intended to be used? > > If adding support for a different storage provider that supports HDFS, > > should additions be made to one or the other, or both? Thanks. >
Re: adding a new cloud filesystem
Thanks. I'm looking at the s3 example and I can only find the S3FileSystemFactory but not the File System implementation (subclass of org.apache.flink.core.fs.FileSystem). Is that requirement still needed?On Wednesday, January 17, 2018, 3:59:47 PM PST, Fabian Hueske wrote: Hi, please have a look at this doc page [1]. It describes how to add new file system implementations and also how to configure them. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#adding-new-file-system-implementations 2018-01-18 0:32 GMT+01:00 cw7k : > Hi, I'm adding support for more cloud storage providers such as Google > (gcs://) and Oracle (oci://). > I have an oci:// test working based on the s3a:// test but when I try it > on an actual Flink job like WordCount, I get this message: > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 'oci'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded." > How do I register new schemes into the file system factory? Thanks. On > Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k > wrote: > > Hi, question on this page: > "You need to point Flink to a valid Hadoop configuration..."https://ci. > apache.org/projects/flink/flink-docs-release-1.4/ops/ > deployment/aws.html#s3-simple-storage-service > How do you point Flink to the Hadoop config? > On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann < > trohrm...@apache.org> wrote: > > Hi, > > the flink-connector-filesystem contains the BucketingSink which is a > connector with which you can write your data to a file system. It provides > exactly once processing guarantees and allows to write data to different > buckets [1]. > > The flink-filesystem module contains different file system implementations > (like mapr fs, hdfs or s3). If you want to use, for example, s3 file > system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module. > > So if you want to write your data to s3 using the BucketingSink, then you > have to add flink-connector-filesystem for the BucketingSink as well as a > s3 file system implementations (e.g. flink-s3-fs-hadoop or > flink-s3-fs-presto). > > Usually, there should be no need to change Flink's filesystem > implementations. If you want to add a new connector, then this would go to > flink-connectors or to Apache Bahir [2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/ > filesystem_sink.html > > [2] > https://ci.apache.org/projects/flink/flink-docs- > master/dev/connectors/index.html#connectors-in-apache-bahir > > Cheers, > Till > > On Fri, Jan 12, 2018 at 7:22 PM, cw7k wrote: > > > Hi, I'm trying to understand the difference between the flink-filesystem > > and flink-connector-filesystem. How is each intended to be used? > > If adding support for a different storage provider that supports HDFS, > > should additions be made to one or the other, or both? Thanks. >
Re: Flink-Yarn-Kerberos integration
Ping, any comments? Thanks a lot. Shuyi On Wed, Jan 3, 2018 at 3:43 PM, Shuyi Chen wrote: > Thanks a lot for the clarification, Eron. That's very helpful. Currently, > we are more concerned about 1) data access, but will get to 2) and 3) > eventually. > > I was thinking doing the following: > 1) extend the current HadoopModule to use and refresh DTs as suggested on YARN > Application Security docs. > 2) I found the current SecurityModule interface might be enough for > supporting other security mechanisms. However, the loading of security > modules are hard-coded, not configuration based. I think we can extend > SecurityUtils to load from configurations. So we can implement our own > security mechanism in our internal repo, and have flink jobs to load it at > runtime. > > Please let me know your comments. Thanks a lot. > > On Fri, Dec 22, 2017 at 3:05 PM, Eron Wright wrote: > >> I agree that it is reasonable to use Hadoop DTs as you describe. That >> approach is even recommended in YARN's documentation (see Securing >> Long-lived YARN Services on the YARN Application Security page). But one >> of the goals of Kerberos integration is to support Kerberized data access >> for connectors other than HDFS, such as Kafka, Cassandra, and >> Elasticsearch. So your second point makes sense too, suggesting a >> general >> architecture for managing secrets (DTs, keytabs, certificates, oauth >> tokens, etc.) within the cluster. >> >> There's quite a few aspects to Flink security, including: >> 1. data access (e.g. how a connector authenticates to a data source) >> 2. service authorization and network security (e.g. how a Flink cluster >> protects itself from unauthorized access) >> 3. multi-user support (e.g. multi-user Flink clusters, RBAC) >> >> I mention these aspects to clarify your point about AuthN, which I took to >> be related to (1). Do tell if I misunderstood. >> >> Eron >> >> >> On Wed, Dec 20, 2017 at 11:21 AM, Shuyi Chen wrote: >> >> > Hi community, >> > >> > We are working on secure Flink on YARN. The current Flink-Yarn-Kerberos >> > integration will require each container of a job to log in Kerberos via >> > keytab every say, 24 hours, and does not use any Hadoop delegation token >> > mechanism except when localizing the container. As I fixed the current >> > Flink-Yarn-Kerberos (FLINK-8275) and tried to add more >> > features(FLINK-7860), I have some concern regarding the current >> > implementation. It can pose a scalability issue to the KDC, e.g., if >> YARN >> > cluster is restarted and all 10s of thousands of containers suddenly >> DDOS >> > KDC. >> > >> > I would like to propose to improve the current Flink-YARN-Kerberos >> > integration by doing something like the following: >> > 1) AppMaster (JobManager) periodically authenticate the KDC, get all >> > required DTs for the job. >> > 2) all other TM or TE containers periodically retrieve new DTs from the >> > AppMaster (either through a secure HDFS folder, or a secure Akka >> channel) >> > >> > Also, we want to extend Flink to support pluggable AuthN mechanism, >> because >> > we have our own internal AuthN mechanism. We would like add support in >> > Flink to authenticate periodically to our internal AuthN service as well >> > through, e.g., dynamic class loading, and use similar mechanism to >> > distribute the credential from the appMaster to containers. >> > >> > I would like to get comments and feedbacks. I can also write a design >> doc >> > or create a Flip if needed. Thanks a lot. >> > >> > Shuyi >> > >> > >> > >> > -- >> > "So you have to trust that the dots will somehow connect in your >> future." >> > >> > > > > -- > "So you have to trust that the dots will somehow connect in your future." > -- "So you have to trust that the dots will somehow connect in your future."