[jira] [Created] (FLINK-8442) Should recovery the input split when an execution failover with FailoverRegion

2018-01-17 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8442:
---

 Summary: Should recovery the input split when an execution 
failover with FailoverRegion
 Key: FLINK-8442
 URL: https://issues.apache.org/jira/browse/FLINK-8442
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Scheduler
Affects Versions: 1.4.0
Reporter: shuai.xu


In flip-1, it enable only restart the executions in a FailoverRegion when a 
task fail. But now the input splits are assigned only when an 
ExecutionJobVertex is initializing, so when an executions restarts, the input 
splits it has read may can not be get from job master any more. Need to recover 
the input splits so they can be be consumed again when the task restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8443) YARNSessionCapacitySchedulerITCase is flakky

2018-01-17 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8443:
-

 Summary: YARNSessionCapacitySchedulerITCase is flakky
 Key: FLINK-8443
 URL: https://issues.apache.org/jira/browse/FLINK-8443
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Piotr Nowojski
 Attachments: 35.5.tar.gz

Attached build logs from travis.

 

Test(s) is failing with: 

 
{noformat}
java.lang.AssertionError: Found a file 
/home/travis/build/dataArtisans/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-1_0/application_1516120275777_0003/container_1516120275

777_0003_01_02/taskmanager.log with a prohibited string (one of [Exception, 
Started SelectChannelConnector@0.0.0.0:8081]). Excerpts{noformat}
After downloading the yarn logs uploaded to transfer.sh there is a following 
failure:

 
{code:java}
2018-01-16 16:32:10,553 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Stopping TaskManager with final application status SUCCEEDED and 
diagnostics: Flink YARN Client requested shutdown

2018-01-16 16:32:10,577 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Stopping TaskManager akka://flink/user/taskmanager#2122015748.

2018-01-16 16:32:10,578 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Disassociating from JobManager

2018-01-16 16:32:10,588 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
            - Shutting down BLOB cache

2018-01-16 16:32:10,599 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
            - Shutting down BLOB cache

2018-01-16 16:32:10,614 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
removed spill file directory 
/home/travis/build/dataArtisans/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-localDir-nm-1_0/usercache/travis/appcache/application_1516120275777_0003/flink-io-356a7c21-a3cd-43cb-926c-7690f861b66c

2018-01-16 16:32:10,615 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment        - Shutting down 
the network environment and its components.

2018-01-16 16:32:10,619 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient         - Successful 
shutdown (took 4 ms).

2018-01-16 16:32:10,623 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
shutdown (took 4 ms).

2018-01-16 16:32:10,641 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Task manager akka://flink/user/taskmanager is completely shut 
down.

2018-01-16 16:32:10,649 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down 
remote daemon.

2018-01-16 16:32:10,650 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon 
shut down; proceeding with flushing remote transports.

2018-01-16 16:32:10,717 WARN  
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline  - 
An exception was thrown by an exception handler.

java.util.concurrent.RejectedExecutionException: Worker has already been 
shutdown

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.execute(DefaultChannelPipeline.java:636)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:781)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)

        at 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline

[jira] [Created] (FLINK-8444) Rework dependency setup docs

2018-01-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8444:
---

 Summary: Rework dependency setup docs
 Key: FLINK-8444
 URL: https://issues.apache.org/jira/browse/FLINK-8444
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0, 1.5.0
Reporter: Chesnay Schepler


Taken from https://github.com/apache/flink/pull/5303:

{quote}
I would suggest to start thinking about the dependencies the following way:

There are pure user-code projects where the Flink runtime is "provided" and 
they are started using an existing Flink setup (bin/flink run or REST entry 
point). This is the Framework Style.

In the future, we will have "Flink as a Library" deployments, where users 
add something like flink-dist as a library to their program and then simply 
dockerize that Java application.

Code can be run in the IDE or other similar style embedded forms. This is 
in some sense also a "Flink as a Library" deployment, but with selective 
(fewer) dependencies. The RocksDB issue applies only to this scenario here.

To make this simpler for the users, it would be great to have not N different 
models that we talk about, but ideally only two: Framework Style and Library 
Style. We could for example start to advocate and document that users should 
always use flink-dist as their standard dependency - "provided" in the 
framework style deployment, "compile" in the library style deployment. That 
might be a really easy way to work with that. The only problem for the time 
being is that flink-dist is quite big and contains for example also optional 
dependencies like flink-table, which makes it more heavyweight for quickstarts. 
Maybe we can accept that as a trade-off for dependency simplicity.
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent

2018-01-17 Thread Chris Thomson (JIRA)
Chris Thomson created FLINK-8445:


 Summary: hostname used in metric names for taskmanager and 
jobmanager are not consistent
 Key: FLINK-8445
 URL: https://issues.apache.org/jira/browse/FLINK-8445
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.3.1
 Environment: I think that this problem is present for metrics 
reporting enabled configurations that include '' as part of the scope for 
the metrics.  For example, using Graphite reporting configuration in 
flink-conf.yaml below:


{code:java}
metrics.scope.jm: flink..jobmanager
metrics.scope.jm.job: flink..jobmanager.
metrics.scope.tm: flink..taskmanager
metrics.scope.tm.job: flink..taskmanager.
metrics.scope.task: 
flink..taskmanager...
metrics.scope.operator: 
flink..taskmanager...

metrics.reporters: graphite
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
...{code}
Reporter: Chris Thomson


Enabled Flink metrics reporting using Graphite using system scopes that contain 
'' for both the job manager and task manager.  The resulting metrics 
reported to Graphite use two different representations for ''.

For *Task Manager metrics* it uses the *short hostname* (without the DNS 
domain).  This is a result of logic in 
org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that tries 
to extract the short hostname from the fully qualified domain name looked up 
from InetAddress.getCanonicalHostName().

For *Job Manager metrics* it uses the *fully qualified domain name* (with the 
DNS domain). This is a result of there being no logic in 
org.apache.flink.runtime.jobmanager.JobManagerRunner or 
org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent 
normalization of the fully qualified domain name down to the short hostname.

Ideally the '' placeholders in the system scopes for the job manager and 
task manager related metrics would be replaced with a consistent value (either 
the short hostname or the fully qualified domain name).  Even better if there 
was a configuration option to decide which one should be used for metric name 
generation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8446) Add support for multiple broadcast states.

2018-01-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8446:
-

 Summary: Add support for multiple broadcast states.
 Key: FLINK-8446
 URL: https://issues.apache.org/jira/browse/FLINK-8446
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


adding a new cloud filesystem

2018-01-17 Thread cw7k
 Hi, I'm adding support for more cloud storage providers such as Google 
(gcs://) and Oracle (oci://).
I have an oci:// test working based on the s3a:// test but when I try it on an 
actual Flink job like WordCount, I get this message:
"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find 
a file system implementation for scheme 'oci'. The scheme is not directly 
supported by Flink and no Hadoop file system to support this scheme could be 
loaded."
How do I register new schemes into the file system factory?  Thanks.On 
Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k  wrote: 
 
 
  Hi, question on this page:
"You need to point Flink to a valid Hadoop 
configuration..."https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#s3-simple-storage-service
How do you point Flink to the Hadoop config?
    On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann 
 wrote:  
 
 Hi,

the flink-connector-filesystem contains the BucketingSink which is a
connector with which you can write your data to a file system. It provides
exactly once processing guarantees and allows to write data to different
buckets [1].

The flink-filesystem module contains different file system implementations
(like mapr fs, hdfs or s3). If you want to use, for example, s3 file
system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module.

So if you want to write your data to s3 using the BucketingSink, then you
have to add flink-connector-filesystem for the BucketingSink as well as a
s3 file system implementations (e.g. flink-s3-fs-hadoop or
flink-s3-fs-presto).

Usually, there should be no need to change Flink's filesystem
implementations. If you want to add a new connector, then this would go to
flink-connectors or to Apache Bahir [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html

[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/index.html#connectors-in-apache-bahir

Cheers,
Till

On Fri, Jan 12, 2018 at 7:22 PM, cw7k  wrote:

> Hi, I'm trying to understand the difference between the flink-filesystem
> and flink-connector-filesystem.  How is each intended to be used?
> If adding support for a different storage provider that supports HDFS,
> should additions be made to one or the other, or both?  Thanks.
    

Re: adding a new cloud filesystem

2018-01-17 Thread Fabian Hueske
Hi,

please have a look at this doc page [1].
It describes how to add new file system implementations and also how to
configure them.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#adding-new-file-system-implementations

2018-01-18 0:32 GMT+01:00 cw7k :

>  Hi, I'm adding support for more cloud storage providers such as Google
> (gcs://) and Oracle (oci://).
> I have an oci:// test working based on the s3a:// test but when I try it
> on an actual Flink job like WordCount, I get this message:
> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oci'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded."
> How do I register new schemes into the file system factory?  Thanks.On
> Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k 
> wrote:
>
>   Hi, question on this page:
> "You need to point Flink to a valid Hadoop configuration..."https://ci.
> apache.org/projects/flink/flink-docs-release-1.4/ops/
> deployment/aws.html#s3-simple-storage-service
> How do you point Flink to the Hadoop config?
> On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>  Hi,
>
> the flink-connector-filesystem contains the BucketingSink which is a
> connector with which you can write your data to a file system. It provides
> exactly once processing guarantees and allows to write data to different
> buckets [1].
>
> The flink-filesystem module contains different file system implementations
> (like mapr fs, hdfs or s3). If you want to use, for example, s3 file
> system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module.
>
> So if you want to write your data to s3 using the BucketingSink, then you
> have to add flink-connector-filesystem for the BucketingSink as well as a
> s3 file system implementations (e.g. flink-s3-fs-hadoop or
> flink-s3-fs-presto).
>
> Usually, there should be no need to change Flink's filesystem
> implementations. If you want to add a new connector, then this would go to
> flink-connectors or to Apache Bahir [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/
> filesystem_sink.html
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/index.html#connectors-in-apache-bahir
>
> Cheers,
> Till
>
> On Fri, Jan 12, 2018 at 7:22 PM, cw7k  wrote:
>
> > Hi, I'm trying to understand the difference between the flink-filesystem
> > and flink-connector-filesystem.  How is each intended to be used?
> > If adding support for a different storage provider that supports HDFS,
> > should additions be made to one or the other, or both?  Thanks.
>


Re: adding a new cloud filesystem

2018-01-17 Thread cw7k
 Thanks. I'm looking at the s3 example and I can only find the 
S3FileSystemFactory but not the File System implementation (subclass of 
org.apache.flink.core.fs.FileSystem).
Is that requirement still needed?On Wednesday, January 17, 2018, 3:59:47 PM 
PST, Fabian Hueske  wrote:  
 
 Hi,

please have a look at this doc page [1].
It describes how to add new file system implementations and also how to
configure them.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/filesystems.html#adding-new-file-system-implementations

2018-01-18 0:32 GMT+01:00 cw7k :

>  Hi, I'm adding support for more cloud storage providers such as Google
> (gcs://) and Oracle (oci://).
> I have an oci:// test working based on the s3a:// test but when I try it
> on an actual Flink job like WordCount, I get this message:
> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oci'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded."
> How do I register new schemes into the file system factory?  Thanks.    On
> Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k 
> wrote:
>
>  Hi, question on this page:
> "You need to point Flink to a valid Hadoop configuration..."https://ci.
> apache.org/projects/flink/flink-docs-release-1.4/ops/
> deployment/aws.html#s3-simple-storage-service
> How do you point Flink to the Hadoop config?
>    On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>  Hi,
>
> the flink-connector-filesystem contains the BucketingSink which is a
> connector with which you can write your data to a file system. It provides
> exactly once processing guarantees and allows to write data to different
> buckets [1].
>
> The flink-filesystem module contains different file system implementations
> (like mapr fs, hdfs or s3). If you want to use, for example, s3 file
> system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto module.
>
> So if you want to write your data to s3 using the BucketingSink, then you
> have to add flink-connector-filesystem for the BucketingSink as well as a
> s3 file system implementations (e.g. flink-s3-fs-hadoop or
> flink-s3-fs-presto).
>
> Usually, there should be no need to change Flink's filesystem
> implementations. If you want to add a new connector, then this would go to
> flink-connectors or to Apache Bahir [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/
> filesystem_sink.html
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/index.html#connectors-in-apache-bahir
>
> Cheers,
> Till
>
> On Fri, Jan 12, 2018 at 7:22 PM, cw7k  wrote:
>
> > Hi, I'm trying to understand the difference between the flink-filesystem
> > and flink-connector-filesystem.  How is each intended to be used?
> > If adding support for a different storage provider that supports HDFS,
> > should additions be made to one or the other, or both?  Thanks.
>
  

Re: Flink-Yarn-Kerberos integration

2018-01-17 Thread Shuyi Chen
Ping, any comments?  Thanks a lot.

Shuyi

On Wed, Jan 3, 2018 at 3:43 PM, Shuyi Chen  wrote:

> Thanks a lot for the clarification, Eron. That's very helpful. Currently,
> we are more concerned about 1) data access, but will get to 2) and 3)
> eventually.
>
> I was thinking doing the following:
> 1) extend the current HadoopModule to use and refresh DTs as suggested on YARN
> Application Security docs.
> 2) I found the current SecurityModule interface might be enough for
> supporting other security mechanisms. However, the loading of security
> modules are hard-coded, not configuration based. I think we can extend
> SecurityUtils to load from configurations. So we can implement our own
> security mechanism in our internal repo, and have flink jobs to load it at
> runtime.
>
> Please let me know your comments. Thanks a lot.
>
> On Fri, Dec 22, 2017 at 3:05 PM, Eron Wright  wrote:
>
>> I agree that it is reasonable to use Hadoop DTs as you describe.  That
>> approach is even recommended in YARN's documentation (see Securing
>> Long-lived YARN Services on the YARN Application Security page).   But one
>> of the goals of Kerberos integration is to support Kerberized data access
>> for connectors other than HDFS, such as Kafka, Cassandra, and
>> Elasticsearch.   So your second point makes sense too, suggesting a
>> general
>> architecture for managing secrets (DTs, keytabs, certificates, oauth
>> tokens, etc.) within the cluster.
>>
>> There's quite a few aspects to Flink security, including:
>> 1. data access (e.g. how a connector authenticates to a data source)
>> 2. service authorization and network security (e.g. how a Flink cluster
>> protects itself from unauthorized access)
>> 3. multi-user support (e.g. multi-user Flink clusters, RBAC)
>>
>> I mention these aspects to clarify your point about AuthN, which I took to
>> be related to (1).   Do tell if I misunderstood.
>>
>> Eron
>>
>>
>> On Wed, Dec 20, 2017 at 11:21 AM, Shuyi Chen  wrote:
>>
>> > Hi community,
>> >
>> > We are working on secure Flink on YARN. The current Flink-Yarn-Kerberos
>> > integration will require each container of a job to log in Kerberos via
>> > keytab every say, 24 hours, and does not use any Hadoop delegation token
>> > mechanism except when localizing the container. As I fixed the current
>> > Flink-Yarn-Kerberos (FLINK-8275) and tried to add more
>> > features(FLINK-7860), I have some concern regarding the current
>> > implementation. It can pose a scalability issue to the KDC, e.g., if
>> YARN
>> > cluster is restarted and all 10s of thousands of containers suddenly
>> DDOS
>> > KDC.
>> >
>> > I would like to propose to improve the current Flink-YARN-Kerberos
>> > integration by doing something like the following:
>> > 1) AppMaster (JobManager) periodically authenticate the KDC, get all
>> > required DTs for the job.
>> > 2) all other TM or TE containers periodically retrieve new DTs from the
>> > AppMaster (either through a secure HDFS folder, or a secure Akka
>> channel)
>> >
>> > Also, we want to extend Flink to support pluggable AuthN mechanism,
>> because
>> > we have our own internal AuthN mechanism. We would like add support in
>> > Flink to authenticate periodically to our internal AuthN service as well
>> > through, e.g., dynamic class loading, and use similar mechanism to
>> > distribute the credential from the appMaster to containers.
>> >
>> > I would like to get comments and feedbacks. I can also write a design
>> doc
>> > or create a Flip if needed. Thanks a lot.
>> >
>> > Shuyi
>> >
>> >
>> >
>> > --
>> > "So you have to trust that the dots will somehow connect in your
>> future."
>> >
>>
>
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>



-- 
"So you have to trust that the dots will somehow connect in your future."