[GitHub] flink pull request #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis c...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2228 [FLINK-4170][kinesis-connector] Simplify Kinesis connecter config keys to be less overly verbose Changes included with this PR: 1) Separate consumer & producer config keys into 2 classes: `ConsumerConfigConstants` and `ProducerConfigConstants`. 2) Update Kinesis docs to use the new concise keys. Will need to rebase this PR after https://github.com/apache/flink/pull/2071 is merged. @rmetzger @uce Can you please review? I'll also like to relate this PR with the discussion on whether or not we will want a reworked configuration class in https://issues.apache.org/jira/browse/FLINK-4195. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4170 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2228.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2228 commit 1579607510328ee9d7dff46c1fcd54be9ac82098 Author: Gordon Tai Date: 2016-07-11T14:41:48Z [FLINK-4170] Simplify Kinesis connecter config keys to be less overly verbose --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372468#comment-15372468 ] ASF GitHub Bot commented on FLINK-4170: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2228 [FLINK-4170][kinesis-connector] Simplify Kinesis connecter config keys to be less overly verbose Changes included with this PR: 1) Separate consumer & producer config keys into 2 classes: `ConsumerConfigConstants` and `ProducerConfigConstants`. 2) Update Kinesis docs to use the new concise keys. Will need to rebase this PR after https://github.com/apache/flink/pull/2071 is merged. @rmetzger @uce Can you please review? I'll also like to relate this PR with the discussion on whether or not we will want a reworked configuration class in https://issues.apache.org/jira/browse/FLINK-4195. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4170 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2228.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2228 commit 1579607510328ee9d7dff46c1fcd54be9ac82098 Author: Gordon Tai Date: 2016-07-11T14:41:48Z [FLINK-4170] Simplify Kinesis connecter config keys to be less overly verbose > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2227: [FLINK-4197] Allow Kinesis endpoint to be overridden via ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2227 Hi @skidder, I like the add, I think serious users will definitely need this. One thing I'm concerned about is how `CONFIG_AWS_REGION` and `CONFIG_AWS_ENDPOINT` should co-exist in the configuration properties, especially when we validate the configuration with `KinesisConfigUtil#validateConfiguration()`. The Kinesis consumer requires the user to specify `CONFIG_AWS_REGION`. I wonder if we need to check for conflicting `CONFIG_AWS_REGION` and `CONFIG_AWS_ENDPOINT` values, i.e.region is set to "us-east-1", but endpoint is set to "ap-northeast-1.amazonaws.com". Also, What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config
[ https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372492#comment-15372492 ] ASF GitHub Bot commented on FLINK-4197: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2227 Hi @skidder, I like the add, I think serious users will definitely need this. One thing I'm concerned about is how `CONFIG_AWS_REGION` and `CONFIG_AWS_ENDPOINT` should co-exist in the configuration properties, especially when we validate the configuration with `KinesisConfigUtil#validateConfiguration()`. The Kinesis consumer requires the user to specify `CONFIG_AWS_REGION`. I wonder if we need to check for conflicting `CONFIG_AWS_REGION` and `CONFIG_AWS_ENDPOINT` values, i.e.region is set to "us-east-1", but endpoint is set to "ap-northeast-1.amazonaws.com". Also, What do you think? > Allow Kinesis Endpoint to be Overridden via Config > -- > > Key: FLINK-4197 > URL: https://issues.apache.org/jira/browse/FLINK-4197 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.0.3 >Reporter: Scott Kidder >Priority: Minor > Labels: easyfix > Fix For: 1.0.4 > > Original Estimate: 1h > Remaining Estimate: 1h > > I perform local testing of my application stack with Flink configured as a > consumer on a Kinesis stream provided by Kinesalite, an implementation of > Kinesis built on LevelDB. This requires me to override the AWS endpoint to > refer to my local Kinesalite server rather than reference the real AWS > endpoint. I'd like to add a configuration property to the Kinesis streaming > connector that allows the AWS endpoint to be specified explicitly. > This should be a fairly small change and provide a lot of flexibility to > people looking to integrate Flink with Kinesis in a non-production setup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1947: [FLINK-1502] [WIP] Basic Metric System
Github user zentol commented on the issue: https://github.com/apache/flink/pull/1947 No. They are currently only accessible via JMX or a system supported by a reporter. They are not available in the Dashboard, this will be worked on for 1.2 . You could write a reporter that listens for http requests and returns the appropriate value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372495#comment-15372495 ] ASF GitHub Bot commented on FLINK-1502: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/1947 No. They are currently only accessible via JMX or a system supported by a reporter. They are not available in the Dashboard, this will be worked on for 1.2 . You could write a reporter that listens for http requests and returns the appropriate value. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis connecte...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2228 We should probably also merge https://github.com/apache/flink/pull/2227 before this one, to reduce effort on rebasing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372501#comment-15372501 ] ASF GitHub Bot commented on FLINK-4170: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2228 We should probably also merge https://github.com/apache/flink/pull/2227 before this one, to reduce effort on rebasing. > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2225 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372507#comment-15372507 ] ASF GitHub Bot commented on FLINK-4191: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2225 > Expose shard information in KinesisDeserializationSchema > > > Key: FLINK-4191 > URL: https://issues.apache.org/jira/browse/FLINK-4191 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we are not exposing the Shard ID and other shard-related > information in the deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4191) Expose shard information in KinesisDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4191. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/662b4586 > Expose shard information in KinesisDeserializationSchema > > > Key: FLINK-4191 > URL: https://issues.apache.org/jira/browse/FLINK-4191 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we are not exposing the Shard ID and other shard-related > information in the deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2071 Sry, I forgot to merge it. Will do now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards
[ https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372513#comment-15372513 ] ASF GitHub Bot commented on FLINK-4018: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2071 Sry, I forgot to merge it. Will do now. > Configurable idle time between getRecords requests to Kinesis shards > > > Key: FLINK-4018 > URL: https://issues.apache.org/jira/browse/FLINK-4018 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently, the Kinesis consumer is calling getRecords() right after finishing > previous calls. This results in easily reaching Amazon's limitation of 5 GET > requests per shard per second. Although the code already has backoff & retry > mechanism, this will affect other applications consuming from the same > Kinesis stream. > Along with this new configuration and the already existing > `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more > control on the desired throughput behaviour for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2071: [FLINK-4018][streaming-connectors] Configurable id...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2071 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards
[ https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4018. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/f0387aca > Configurable idle time between getRecords requests to Kinesis shards > > > Key: FLINK-4018 > URL: https://issues.apache.org/jira/browse/FLINK-4018 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently, the Kinesis consumer is calling getRecords() right after finishing > previous calls. This results in easily reaching Amazon's limitation of 5 GET > requests per shard per second. Although the code already has backoff & retry > mechanism, this will affect other applications consuming from the same > Kinesis stream. > Along with this new configuration and the already existing > `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more > control on the desired throughput behaviour for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards
[ https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372515#comment-15372515 ] ASF GitHub Bot commented on FLINK-4018: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2071 > Configurable idle time between getRecords requests to Kinesis shards > > > Key: FLINK-4018 > URL: https://issues.apache.org/jira/browse/FLINK-4018 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently, the Kinesis consumer is calling getRecords() right after finishing > previous calls. This results in easily reaching Amazon's limitation of 5 GET > requests per shard per second. Although the code already has backoff & retry > mechanism, this will affect other applications consuming from the same > Kinesis stream. > Along with this new configuration and the already existing > `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more > control on the desired throughput behaviour for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372521#comment-15372521 ] Robert Metzger commented on FLINK-3516: --- And again https://s3.amazonaws.com/archive.travis-ci.org/jobs/144055780/log.txt > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2071 No problem, thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis connecte...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2228 Rebasing on merge of https://github.com/apache/flink/pull/2071 .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards
[ https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372558#comment-15372558 ] ASF GitHub Bot commented on FLINK-4018: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2071 No problem, thank you! > Configurable idle time between getRecords requests to Kinesis shards > > > Key: FLINK-4018 > URL: https://issues.apache.org/jira/browse/FLINK-4018 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently, the Kinesis consumer is calling getRecords() right after finishing > previous calls. This results in easily reaching Amazon's limitation of 5 GET > requests per shard per second. Although the code already has backoff & retry > mechanism, this will affect other applications consuming from the same > Kinesis stream. > Along with this new configuration and the already existing > `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more > control on the desired throughput behaviour for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372559#comment-15372559 ] ASF GitHub Bot commented on FLINK-4170: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2228 Rebasing on merge of https://github.com/apache/flink/pull/2071 .. > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time
Till Rohrmann created FLINK-4198: Summary: Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time Key: FLINK-4198 URL: https://issues.apache.org/jira/browse/FLINK-4198 Project: Flink Issue Type: Sub-task Affects Versions: 1.0.0 Reporter: Till Rohrmann Fix For: 2.0.0 Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it with {{org.apache.flink.api.common.time.Time}} which resides in {{flink-core}}. The latter is basically the copy of the former which has been moved to {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372563#comment-15372563 ] Ufuk Celebi commented on FLINK-3516: OK, I will look into this. I didn't do it until now because I wanted to wait for some of the recent HA changes to be merged. > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372570#comment-15372570 ] Stephan Ewen commented on FLINK-4193: - One of the crash reports looks like it could be G1 GC related. G1 is known to still have some issues in most Java 8 versions. Can you try a different GC, see if that problem still occurs. > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
Kostas Kloudas created FLINK-4199: - Summary: Wrong client behavior when submitting job to non-existing cluster Key: FLINK-4199 URL: https://issues.apache.org/jira/browse/FLINK-4199 Project: Flink Issue Type: Bug Reporter: Kostas Kloudas Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the last line: "Job has been submitted with" is totally misleading. Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-4199: -- Description: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d was: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the last line: "Job has been submitted with" is totally misleading. Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2229: [hotfix][docs] Add note about Kinesis producer lim...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2229 [hotfix][docs] Add note about Kinesis producer limitations While testing the Kinesis connector @uce and I found out that data is not written in-order with the Kinesis producer. I'm now mentioning this in the documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink kinesis_doc_hf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2229.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2229 commit a5679996f505ace3b858c19d52185b593452fc8c Author: Robert Metzger Date: 2016-07-12T09:18:42Z [hotfix][docs] Add note about Kinesis producer limitations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372606#comment-15372606 ] Robert Metzger commented on FLINK-4015: --- What's the throughput of the Kafka producer? Do you think the synchronous producer would be fast enough? I'm asking because the synchronous variant is probably easier to implement and also to operate. With the buffering Kafka producer, you need to maintain all the unconfirmed records in memory, so you may run into garbage collection issues. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2209: [FLINK-4111] [table] Flink Table & SQL doesn't work in ve...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2209 Thanks @rmetzger. I would merge it today if there are no objections. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
[ https://issues.apache.org/jira/browse/FLINK-4111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372621#comment-15372621 ] ASF GitHub Bot commented on FLINK-4111: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2209 Thanks @rmetzger. I would merge it today if there are no objections. > Flink Table & SQL doesn't work in very simple example > - > > Key: FLINK-4111 > URL: https://issues.apache.org/jira/browse/FLINK-4111 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Timo Walther > Fix For: 1.1.0 > > > I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and > SQL in my project. But when I run the very simple example WordCountTable, I > encountered the following exception : > {code} > Exception in thread "main" java.lang.NoSuchMethodError: > org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; > at > org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) > at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) > at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) > {code} > It seems that something wrong with our guava shade. Do you have any ideas? > My pom file and WordCountTable.scala are > [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. > And I found someone have the same problem on stack overflow > [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3916) Allow generic types passing the Table API
[ https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372624#comment-15372624 ] ASF GitHub Bot commented on FLINK-3916: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2197 I would like to merge this if there are no objections. It is a very important improvement. > Allow generic types passing the Table API > - > > Key: FLINK-3916 > URL: https://issues.apache.org/jira/browse/FLINK-3916 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API currently only supports BasicTypes that can pass the Table API. > Other types should also be supported but treated as black boxes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2197: [FLINK-3916] [table] Allow generic types passing the Tabl...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2197 I would like to merge this if there are no objections. It is a very important improvement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372638#comment-15372638 ] Gyula Fora commented on FLINK-4193: --- I can probably try a different GC later but I have only seen That error once. Now I have got another error that happened after some jobs crashed on a Kafka issue: https://gist.github.com/gyfora/ea80631abdf59b6eb2e198e9bfa5c520 > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets
[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372649#comment-15372649 ] Josh Forman-Gornall commented on FLINK-4190: Ah yeah that's a good point! I'll add the deprecated RollingSink back in then. > Generalise RollingSink to work with arbitrary buckets > - > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Reporter: Josh Forman-Gornall >Assignee: Josh Forman-Gornall >Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2229: [hotfix][docs] Add note about Kinesis producer limitation...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2229 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis c...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70413349 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- Should we rename "position" to sequence number as this is the term used in the Kinesis docs? https://aws.amazon.com/kinesis/streams/faqs/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372655#comment-15372655 ] ASF GitHub Bot commented on FLINK-4170: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70413349 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- Should we rename "position" to sequence number as this is the term used in the Kinesis docs? https://aws.amazon.com/kinesis/streams/faqs/ > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2229: [hotfix][docs] Add note about Kinesis producer lim...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2229#discussion_r70413125 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -240,8 +239,9 @@ consumer when calling this API can also be modified by using the other keys pref ### Kinesis Producer The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in -Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of a failure, data will be written again -to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +Flink's checkpointing and doesn't provide exactly-once processing guarantees. +Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +In case of a failure, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. --- End diff -- Should we also mention that resharding streams will using the Kinesis Producer will also result in duplicates? I experienced this while testing exactly-once with resharding before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis connecte...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2228 I think this is an improvement over the current state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372658#comment-15372658 ] ASF GitHub Bot commented on FLINK-4170: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2228 I think this is an improvement over the current state. > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2229: [hotfix][docs] Add note about Kinesis producer lim...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2229#discussion_r70414090 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -240,8 +239,9 @@ consumer when calling this API can also be modified by using the other keys pref ### Kinesis Producer The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in -Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of a failure, data will be written again -to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +Flink's checkpointing and doesn't provide exactly-once processing guarantees. +Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +In case of a failure, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. --- End diff -- Yes, please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372665#comment-15372665 ] Sebastian Klemke commented on FLINK-4015: - No, synchronous wouldn't be fast enough: We have ~11k records produced per second on each sink node, if this number drops significantly, our replay procedure will take much longer. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2229: [hotfix][docs] Add note about Kinesis producer lim...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2229#discussion_r70423762 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -240,8 +239,9 @@ consumer when calling this API can also be modified by using the other keys pref ### Kinesis Producer The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in -Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of a failure, data will be written again -to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +Flink's checkpointing and doesn't provide exactly-once processing guarantees. +Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +In case of a failure, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. --- End diff -- Good point. I added it. I'll now merge the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2229: [hotfix][docs] Add note about Kinesis producer lim...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2229 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2183 Changes look good to me. Thanks for your contribution @zentol. Will be merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4123) CassandraWriteAheadSink can hang on cassandra failure
[ https://issues.apache.org/jira/browse/FLINK-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372825#comment-15372825 ] ASF GitHub Bot commented on FLINK-4123: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2183 Changes look good to me. Thanks for your contribution @zentol. Will be merging this PR. > CassandraWriteAheadSink can hang on cassandra failure > - > > Key: FLINK-4123 > URL: https://issues.apache.org/jira/browse/FLINK-4123 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.1.0 > > > The CassandraWriteAheadSink verifies that all writes send to cassandra have > been applied by counting how many were sent and how many callbacks were > activated. Once all writes were sent the sink enters into a loop that is only > exited once both counts are equal. > Thus, should cassandra crash after all writes were sent, without having > acknowledged all writes, the sink will deadlock in the loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Component/s: Client > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Description: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d {code} was: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > {code} > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Description: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d was: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > {code} > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2227: [FLINK-4197] Allow Kinesis endpoint to be overridden via ...
Github user skidder commented on the issue: https://github.com/apache/flink/pull/2227 Thanks for the comment, @tzulitai ! I have not had any problems with testing Flink against a local Kinesalite instance where the Region (required by Flink) is set to `us-east-1` and the endpoint is set to an address on localhost. The `setRegion` function on the AWS Kinesis client calculates the endpoint URL from the region name and sets it on the client. The `setEndpoint` function is meant to allow for testing against non-AWS endpoints, or even new AWS endpoints in Regions unrecognized by an older version of the library. I think that requiring the AWS region, and optionally using the AWS endpoint if provided, will work well; this is consistent with the AWS client's own behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config
[ https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372835#comment-15372835 ] ASF GitHub Bot commented on FLINK-4197: --- Github user skidder commented on the issue: https://github.com/apache/flink/pull/2227 Thanks for the comment, @tzulitai ! I have not had any problems with testing Flink against a local Kinesalite instance where the Region (required by Flink) is set to `us-east-1` and the endpoint is set to an address on localhost. The `setRegion` function on the AWS Kinesis client calculates the endpoint URL from the region name and sets it on the client. The `setEndpoint` function is meant to allow for testing against non-AWS endpoints, or even new AWS endpoints in Regions unrecognized by an older version of the library. I think that requiring the AWS region, and optionally using the AWS endpoint if provided, will work well; this is consistent with the AWS client's own behavior. > Allow Kinesis Endpoint to be Overridden via Config > -- > > Key: FLINK-4197 > URL: https://issues.apache.org/jira/browse/FLINK-4197 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.0.3 >Reporter: Scott Kidder >Priority: Minor > Labels: easyfix > Fix For: 1.0.4 > > Original Estimate: 1h > Remaining Estimate: 1h > > I perform local testing of my application stack with Flink configured as a > consumer on a Kinesis stream provided by Kinesalite, an implementation of > Kinesis built on LevelDB. This requires me to override the AWS endpoint to > refer to my local Kinesalite server rather than reference the real AWS > endpoint. I'd like to add a configuration property to the Kinesis streaming > connector that allows the AWS endpoint to be specified explicitly. > This should be a fairly small change and provide a lot of flexibility to > people looking to integrate Flink with Kinesis in a non-production setup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4200) Kafka consumers should log the offset from which they restore
Stefan Richter created FLINK-4200: - Summary: Kafka consumers should log the offset from which they restore Key: FLINK-4200 URL: https://issues.apache.org/jira/browse/FLINK-4200 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Stefan Richter Assignee: Stefan Richter Priority: Trivial Kafka consumers should log the offset from which they restore so that it is easier to investigate problems with recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4034) Dependency convergence on com.101tec:zkclient and com.esotericsoftware.kryo:kryo
[ https://issues.apache.org/jira/browse/FLINK-4034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372843#comment-15372843 ] Stephan Ewen commented on FLINK-4034: - Okay, I see the issue. May make sense then to converge the dependencies all in one pull request. I would like to postpone this to the 1.2 release (1.1 is already in the final phase), because in my experience, these dependency management issues have often subtle implications that show only over time and use cases. I feel a bit uneasy merging something like this so short before a release. Would it work for you to if we get back at this in roughly a week? I would start giving some comments on the pull request until then. > Dependency convergence on com.101tec:zkclient and > com.esotericsoftware.kryo:kryo > > > Key: FLINK-4034 > URL: https://issues.apache.org/jira/browse/FLINK-4034 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.3 >Reporter: Vladislav Pernin > > If dependency convergence is enabled and asserted on Maven, projects using > Flink do not compile. > Example : > {code} > Dependency convergence error for com.esotericsoftware.kryo:kryo:2.24.0 paths > to dependency are: > +-groupidXXX:artifactidXXX:versionXXX > +-org.apache.flink:flink-java:1.0.3 > +-org.apache.flink:flink-core:1.0.3 > +-com.esotericsoftware.kryo:kryo:2.24.0 > and > +-groupidXXX:artifactidXXX:versionXXX > +-org.apache.flink:flink-streaming-java_2.11:1.0.3 > +-org.apache.flink:flink-runtime_2.11:1.0.3 > +-com.twitter:chill_2.11:0.7.4 > +-com.twitter:chill-java:0.7.4 > +-com.esotericsoftware.kryo:kryo:2.21 > and > +-groupidXXX:artifactidXXX:versionXXX > +-org.apache.flink:flink-streaming-java_2.11:1.0.3 > +-org.apache.flink:flink-runtime_2.11:1.0.3 > +-com.twitter:chill_2.11:0.7.4 > +-com.esotericsoftware.kryo:kryo:2.21 > {code} > > {code} > Dependency convergence error for com.101tec:zkclient:0.7 paths to dependency > are: > +-groupidXXX:artifactidXXX:versionXXX > +-org.apache.flink:flink-connector-kafka-0.8_2.11:1.0.3 > +-org.apache.flink:flink-connector-kafka-base_2.11:1.0.3 > +-com.101tec:zkclient:0.7 > and > +-groupidXXX:artifactidXXX:versionXXX > +-org.apache.flink:flink-connector-kafka-0.8_2.11:1.0.3 > +-org.apache.kafka:kafka_2.11:0.8.2.2 > +-com.101tec:zkclient:0.3 > {code} > I cannot emit a pull request without knowing on which specifics versions you > rely. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2192: Flink 4034 Maven dependency convergence
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2192 Good idea in general. I would use variables for all versions used more than once. For example, where ever `io.netty::netty` is used in version 3.x, it should probably refer to a variable which globally defines the version for that artifact. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2230: [FLINK-4200] [Kafka Connector] Kafka consumers log...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2230 [FLINK-4200] [Kafka Connector] Kafka consumers logs the offset from w⦠You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink 4200-kafka-offset-logging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2230 commit 1e0feb292a37645a459bf4a2c9c74ca4be1f6d39 Author: Stefan Richter Date: 2016-07-12T13:14:12Z [FLINK-4200] [Kafka Connector] Kafka consumers logs the offset from which they restore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4200) Kafka consumers should log the offset from which they restore
[ https://issues.apache.org/jira/browse/FLINK-4200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372860#comment-15372860 ] ASF GitHub Bot commented on FLINK-4200: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2230 [FLINK-4200] [Kafka Connector] Kafka consumers logs the offset from w… You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink 4200-kafka-offset-logging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2230 commit 1e0feb292a37645a459bf4a2c9c74ca4be1f6d39 Author: Stefan Richter Date: 2016-07-12T13:14:12Z [FLINK-4200] [Kafka Connector] Kafka consumers logs the offset from which they restore > Kafka consumers should log the offset from which they restore > - > > Key: FLINK-4200 > URL: https://issues.apache.org/jira/browse/FLINK-4200 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Trivial > > Kafka consumers should log the offset from which they restore so that it is > easier to investigate problems with recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis c...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70437664 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- I think it is better to use "position". For Kinesis users, the term "position" is more familiar to be used to refer to where to start reading a stream. Amazon's Kinesis Client Library API uses "position" too, as well as the Kinesis AWS service Web UI. "sequence numbers" refers more specifically to a record's offset in a shard, and since we aren't offering to start reading streams from a specific offset, I think the name change isn't that suitable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
[ https://issues.apache.org/jira/browse/FLINK-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372874#comment-15372874 ] Till Rohrmann commented on FLINK-4006: -- Thanks for spotting this issue [~bowen.zheng]. I think you're right that we should null the restart strategy in the {{prepareForArchiving}} method. > ExecutionGraph.restartStrategy field can't be serialized > > > Key: FLINK-4006 > URL: https://issues.apache.org/jira/browse/FLINK-4006 > Project: Flink > Issue Type: Bug >Reporter: ZhengBowen > Attachments: FLINK-4006.patch > > > Exception is following > ``` > java.io.NotSerializableException: > org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ``` > I think you should set null for restartStrategy in prepareForArchiving() > function. > Following attachments is my patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372875#comment-15372875 ] ASF GitHub Bot commented on FLINK-4170: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70437664 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- I think it is better to use "position". For Kinesis users, the term "position" is more familiar to be used to refer to where to start reading a stream. Amazon's Kinesis Client Library API uses "position" too, as well as the Kinesis AWS service Web UI. "sequence numbers" refers more specifically to a record's offset in a shard, and since we aren't offering to start reading streams from a specific offset, I think the name change isn't that suitable. > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2197: [FLINK-3916] [table] Allow generic types passing the Tabl...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2197 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3916) Allow generic types passing the Table API
[ https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372887#comment-15372887 ] ASF GitHub Bot commented on FLINK-3916: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2197 Merging... > Allow generic types passing the Table API > - > > Key: FLINK-3916 > URL: https://issues.apache.org/jira/browse/FLINK-3916 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API currently only supports BasicTypes that can pass the Table API. > Other types should also be supported but treated as black boxes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2227: [FLINK-4197] Allow Kinesis endpoint to be overridden via ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2227 @skidder I see, thank you for your explanation. If it's a familiar behavior (with respect to AWS / Kinesis users) to completely override the region when setting the endpoint regardless of the original value, then I think it's ok. Can you also add information about the overriding behaviour in the Kinesis connector documentation (can be found at `flink/docs/api/streaming/connectors/kinesis.md`)? I think it'll be good to have this explained. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config
[ https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372892#comment-15372892 ] ASF GitHub Bot commented on FLINK-4197: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2227 @skidder I see, thank you for your explanation. If it's a familiar behavior (with respect to AWS / Kinesis users) to completely override the region when setting the endpoint regardless of the original value, then I think it's ok. Can you also add information about the overriding behaviour in the Kinesis connector documentation (can be found at `flink/docs/api/streaming/connectors/kinesis.md`)? I think it'll be good to have this explained. > Allow Kinesis Endpoint to be Overridden via Config > -- > > Key: FLINK-4197 > URL: https://issues.apache.org/jira/browse/FLINK-4197 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.0.3 >Reporter: Scott Kidder >Priority: Minor > Labels: easyfix > Fix For: 1.0.4 > > Original Estimate: 1h > Remaining Estimate: 1h > > I perform local testing of my application stack with Flink configured as a > consumer on a Kinesis stream provided by Kinesalite, an implementation of > Kinesis built on LevelDB. This requires me to override the AWS endpoint to > refer to my local Kinesalite server rather than reference the real AWS > endpoint. I'd like to add a configuration property to the Kinesis streaming > connector that allows the AWS endpoint to be specified explicitly. > This should be a fairly small change and provide a lot of flexibility to > people looking to integrate Flink with Kinesis in a non-production setup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2197: [FLINK-3916] [table] Allow generic types passing t...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2197 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3916) Allow generic types passing the Table API
[ https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372895#comment-15372895 ] ASF GitHub Bot commented on FLINK-3916: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2197 > Allow generic types passing the Table API > - > > Key: FLINK-3916 > URL: https://issues.apache.org/jira/browse/FLINK-3916 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API currently only supports BasicTypes that can pass the Table API. > Other types should also be supported but treated as black boxes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
[ https://issues.apache.org/jira/browse/FLINK-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4006: Assignee: Till Rohrmann > ExecutionGraph.restartStrategy field can't be serialized > > > Key: FLINK-4006 > URL: https://issues.apache.org/jira/browse/FLINK-4006 > Project: Flink > Issue Type: Bug >Reporter: ZhengBowen >Assignee: Till Rohrmann > Attachments: FLINK-4006.patch > > > Exception is following > ``` > java.io.NotSerializableException: > org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ``` > I think you should set null for restartStrategy in prepareForArchiving() > function. > Following attachments is my patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
[ https://issues.apache.org/jira/browse/FLINK-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4006. Resolution: Fixed Fixed via cc9ee2cc37ed0c7a7f9b0c50cf5edca63e289e1a > ExecutionGraph.restartStrategy field can't be serialized > > > Key: FLINK-4006 > URL: https://issues.apache.org/jira/browse/FLINK-4006 > Project: Flink > Issue Type: Bug >Reporter: ZhengBowen >Assignee: Till Rohrmann > Attachments: FLINK-4006.patch > > > Exception is following > ``` > java.io.NotSerializableException: > org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ``` > I think you should set null for restartStrategy in prepareForArchiving() > function. > Following attachments is my patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3916) Allow generic types passing the Table API
[ https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-3916. - Resolution: Fixed Fix Version/s: 1.1.0 Fixed in 1b327f1ae7d078e22700729524e374b449b0f209. > Allow generic types passing the Table API > - > > Key: FLINK-3916 > URL: https://issues.apache.org/jira/browse/FLINK-3916 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.1.0 > > > The Table API currently only supports BasicTypes that can pass the Table API. > Other types should also be supported but treated as black boxes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets
[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372902#comment-15372902 ] Josh Forman-Gornall commented on FLINK-4190: I've made those changes to my branch - I also added back the {{Bucketer}} interface and its subclasses (all marked as deprecated) and called the new version {{BucketingFunction}} > Generalise RollingSink to work with arbitrary buckets > - > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Reporter: Josh Forman-Gornall >Assignee: Josh Forman-Gornall >Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
[ https://issues.apache.org/jira/browse/FLINK-4111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372903#comment-15372903 ] ASF GitHub Bot commented on FLINK-4111: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2209 Merging... > Flink Table & SQL doesn't work in very simple example > - > > Key: FLINK-4111 > URL: https://issues.apache.org/jira/browse/FLINK-4111 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Timo Walther > Fix For: 1.1.0 > > > I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and > SQL in my project. But when I run the very simple example WordCountTable, I > encountered the following exception : > {code} > Exception in thread "main" java.lang.NoSuchMethodError: > org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; > at > org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) > at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) > at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) > {code} > It seems that something wrong with our guava shade. Do you have any ideas? > My pom file and WordCountTable.scala are > [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. > And I found someone have the same problem on stack overflow > [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2209: [FLINK-4111] [table] Flink Table & SQL doesn't work in ve...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2209 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372920#comment-15372920 ] ASF GitHub Bot commented on FLINK-4170: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70443267 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- OK, that is a very good reason :-) > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2228: [FLINK-4170][kinesis-connector] Simplify Kinesis c...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2228#discussion_r70443267 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -17,89 +17,89 @@ package org.apache.flink.streaming.connectors.kinesis.config; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + /** - * Keys and default values used to configure the Kinesis consumer. + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} */ -public class KinesisConfigConstants { +public class ConsumerConfigConstants extends AWSConfigConstants { - // - // Configuration Keys - // + /** +* The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used +* when the consumer tasks retrieve the first shard iterator for each Kinesis shard. +*/ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; --- End diff -- OK, that is a very good reason :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4150: -- Description: Submitting a job in Yarn with HA can lead to the following exception: {code} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' (invalid JAR: zip file is empty) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} Some job information, including the Blob ids, are stored in Zookeeper. The actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the cluster is shut down, the path for the BlobStore is deleted. When the cluster is then restarted, recovering jobs cannot restore because it's Blob ids stored in Zookeeper now point to deleted files. was: Submitting a job in Yarn with HA can lead to the following exception: {code} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' (invalid JAR: zip file is empty) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} Some job information, including the Blob ids, are stored in Zookeeper. The actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the cluster is shut down, the path for the BlobStore is deleted. When the cluster is then restarted, recovering jobs cannot restore because it's Blob ids stored in Zookeeper now point to deleted files. In particular, this problem frequently occurs for HA in combination with -m yarn-cluster. We should discuss in how far this combination actually makes sense and what the expected behavior should be. > Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown > > > Key: FLINK-4150 > URL: https://issues.apache.org/jira/browse/FLINK-4150 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Reporter: Stefan Richter >Priority: Blocker > Fix For: 1.1.0 > > > Submitting a job in Yarn with HA can lead to the following exception: > {code} > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load > user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > ClassLoader info: URL ClassLoader: > file: > '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' > (invalid JAR: zip file is empty) > Class not resolvable through given classloader. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > {code} > Some job information, including the Blob ids, are stored in Zookeeper. The > actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set > to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the > cluster is shut down, the path for the BlobStore is deleted. When the cluster > is then restarted, recovering jobs cannot restore because it's Blob ids > stored in Zookeeper now point to deleted files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4192) Move Metrics API to separate module
[ https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372930#comment-15372930 ] ASF GitHub Bot commented on FLINK-4192: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Is it important that the metric reporter projects do not depend on "flink-core"? If not, the the entire MetricRegistry could stay in flink-core, and there would be no need to the MetricConfig object. > Move Metrics API to separate module > --- > > Key: FLINK-4192 > URL: https://issues.apache.org/jira/browse/FLINK-4192 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > All metrics code currently resides in flink-core. If a user implements a > reporter and wants a fat jar it will now have to include the entire > flink-core module. > Instead, we could move several interfaces into a separate module. > These interfaces to move include: > * Counter, Gauge, Histogram(Statistics) > * MetricGroup > * MetricReporter, Scheduled, AbstractReporter > In addition a new MetricRegistry interface will be required as well as a > replacement for the Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2209: [FLINK-4111] [table] Flink Table & SQL doesn't wor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2209 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
[ https://issues.apache.org/jira/browse/FLINK-4111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372932#comment-15372932 ] ASF GitHub Bot commented on FLINK-4111: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2209 > Flink Table & SQL doesn't work in very simple example > - > > Key: FLINK-4111 > URL: https://issues.apache.org/jira/browse/FLINK-4111 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Timo Walther > Fix For: 1.1.0 > > > I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and > SQL in my project. But when I run the very simple example WordCountTable, I > encountered the following exception : > {code} > Exception in thread "main" java.lang.NoSuchMethodError: > org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; > at > org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) > at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) > at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) > {code} > It seems that something wrong with our guava shade. Do you have any ideas? > My pom file and WordCountTable.scala are > [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. > And I found someone have the same problem on stack overflow > [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
Stefan Richter created FLINK-4201: - Summary: Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted Key: FLINK-4201 URL: https://issues.apache.org/jira/browse/FLINK-4201 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Stefan Richter Priority: Blocker For example, when shutting down a Yarn session, according to the logs checkpoints for jobs that did not terminate are deleted. In the shutdown hook, removeAllCheckpoints is called and removes checkpoints that should still be kept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Is it important that the metric reporter projects do not depend on "flink-core"? If not, the the entire MetricRegistry could stay in flink-core, and there would be no need to the MetricConfig object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4202) Add JM metric which shows the restart duration
Till Rohrmann created FLINK-4202: Summary: Add JM metric which shows the restart duration Key: FLINK-4202 URL: https://issues.apache.org/jira/browse/FLINK-4202 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.1.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.1.0 It is convenient for users to have a metric which tells you how long the restarting of a job has taken. I propose a to introduce a {{Gauge}} which returns the time between {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config
[ https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372941#comment-15372941 ] ASF GitHub Bot commented on FLINK-4197: --- Github user skidder commented on the issue: https://github.com/apache/flink/pull/2227 @tzulitai definitely, I'll add an explanation of the config setting in the docs shortly. > Allow Kinesis Endpoint to be Overridden via Config > -- > > Key: FLINK-4197 > URL: https://issues.apache.org/jira/browse/FLINK-4197 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.0.3 >Reporter: Scott Kidder >Priority: Minor > Labels: easyfix > Fix For: 1.0.4 > > Original Estimate: 1h > Remaining Estimate: 1h > > I perform local testing of my application stack with Flink configured as a > consumer on a Kinesis stream provided by Kinesalite, an implementation of > Kinesis built on LevelDB. This requires me to override the AWS endpoint to > refer to my local Kinesalite server rather than reference the real AWS > endpoint. I'd like to add a configuration property to the Kinesis streaming > connector that allows the AWS endpoint to be specified explicitly. > This should be a fairly small change and provide a lot of flexibility to > people looking to integrate Flink with Kinesis in a non-production setup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2227: [FLINK-4197] Allow Kinesis endpoint to be overridden via ...
Github user skidder commented on the issue: https://github.com/apache/flink/pull/2227 @tzulitai definitely, I'll add an explanation of the config setting in the docs shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
[ https://issues.apache.org/jira/browse/FLINK-4111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4111. - Resolution: Fixed Fixed in 508965e69cfb99724b69f1e5dc59e5f5d5a70315. > Flink Table & SQL doesn't work in very simple example > - > > Key: FLINK-4111 > URL: https://issues.apache.org/jira/browse/FLINK-4111 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Timo Walther > Fix For: 1.1.0 > > > I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and > SQL in my project. But when I run the very simple example WordCountTable, I > encountered the following exception : > {code} > Exception in thread "main" java.lang.NoSuchMethodError: > org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; > at > org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) > at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) > at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) > {code} > It seems that something wrong with our guava shade. Do you have any ideas? > My pom file and WordCountTable.scala are > [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. > And I found someone have the same problem on stack overflow > [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2226 you are correct that the MetricRegistry could remain in flink-core. However, you would still need the new MetricConfig due to the MetricReporter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4192) Move Metrics API to separate module
[ https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372942#comment-15372942 ] ASF GitHub Bot commented on FLINK-4192: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2226 you are correct that the MetricRegistry could remain in flink-core. However, you would still need the new MetricConfig due to the MetricReporter. > Move Metrics API to separate module > --- > > Key: FLINK-4192 > URL: https://issues.apache.org/jira/browse/FLINK-4192 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > All metrics code currently resides in flink-core. If a user implements a > reporter and wants a fat jar it will now have to include the entire > flink-core module. > Instead, we could move several interfaces into a separate module. > These interfaces to move include: > * Counter, Gauge, Histogram(Statistics) > * MetricGroup > * MetricReporter, Scheduled, AbstractReporter > In addition a new MetricRegistry interface will be required as well as a > replacement for the Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2202: [FLINK-4149] Fix Serialization of NFA in AbstractK...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70448543 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -19,12 +19,16 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; +import org.apache.commons.io.output.ByteArrayOutputStream; --- End diff -- Why do we use once the `org.apache.commons.io.output.BAOS` and then the `java.io.BAIS`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4149) Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
[ https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372952#comment-15372952 ] ASF GitHub Bot commented on FLINK-4149: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70448543 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -19,12 +19,16 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; +import org.apache.commons.io.output.ByteArrayOutputStream; --- End diff -- Why do we use once the `org.apache.commons.io.output.BAOS` and then the `java.io.BAIS`? > Fix Serialization of NFA in AbstractKeyedCEPPatternOperator > --- > > Key: FLINK-4149 > URL: https://issues.apache.org/jira/browse/FLINK-4149 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.1.0 > > > A job that uses CEP fails upon restore with a {{NullPointerException}} in > {{NFA.process()}}. The reason seems to be that field {{computationStates}} is > {{null}}. This field is transient and read in a custom {{readObject()}} > method. > In {{AbstractKeyedCEPPatternOperator}} this snipped is used to construct a > {{StateDescriptor}} for an {{NFA}} state: > {code} > new ValueStateDescriptor>( > NFA_OPERATOR_STATE_NAME, > new KryoSerializer>((Class>) (Class) NFA.class, > getExecutionConfig()), > null) > {code} > It seems Kryo does not invoke {{readObject}}/{{writeObject}}. We probably > need a custom {{TypeSerializer}} for {{NFA}} to solve the problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Could the `MetricReporter` stay in `flink-core`as well, or does that void the separation of dependencies. You can probably use the Java `Properties` object as a configuration object. Saves you the extra config object. Kafka, for example, uses the `Properties` class extensively and has no own config type at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4192) Move Metrics API to separate module
[ https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372955#comment-15372955 ] ASF GitHub Bot commented on FLINK-4192: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Could the `MetricReporter` stay in `flink-core`as well, or does that void the separation of dependencies. You can probably use the Java `Properties` object as a configuration object. Saves you the extra config object. Kafka, for example, uses the `Properties` class extensively and has no own config type at all. > Move Metrics API to separate module > --- > > Key: FLINK-4192 > URL: https://issues.apache.org/jira/browse/FLINK-4192 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > All metrics code currently resides in flink-core. If a user implements a > reporter and wants a fat jar it will now have to include the entire > flink-core module. > Instead, we could move several interfaces into a separate module. > These interfaces to move include: > * Counter, Gauge, Histogram(Statistics) > * MetricGroup > * MetricReporter, Scheduled, AbstractReporter > In addition a new MetricRegistry interface will be required as well as a > replacement for the Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2202: [FLINK-4149] Fix Serialization of NFA in AbstractK...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70449634 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); --- End diff -- Can't we use the `DataOutputViewStream`? ``` ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target)); oos.writeObject(record); oos.close(); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2202: [FLINK-4149] Fix Serialization of NFA in AbstractK...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70449753 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); + } catch (IOException e) { + throw new RuntimeException("Could not serialize NFA.", e); + } + } + + @Override + public NFA deserialize(DataInputView source) throws IOException { + try { + int size = source.readInt(); + + byte[] data = new byte[size]; + + source.readFully(data); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; --- End diff -- Same here with the `DataInputViewStream`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4149) Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
[ https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372959#comment-15372959 ] ASF GitHub Bot commented on FLINK-4149: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70449634 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); --- End diff -- Can't we use the `DataOutputViewStream`? ``` ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target)); oos.writeObject(record); oos.close(); ``` > Fix Serialization of NFA in AbstractKeyedCEPPatternOperator > --- > > Key: FLINK-4149 > URL: https://issues.apache.org/jira/browse/FLINK-4149 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.1.0 > > > A job that uses CEP fails upon restore with a {{NullPointerException}} in > {{NFA.process()}}. The reason seems to be that field {{computationStates}} is > {{null}}. This field is transient and read in a custom {{readObject()}} > method. > In {{AbstractKeyedCEPPatternOperator}} this snipped is used to construct a > {{StateDescriptor}} for an {{NFA}} state: > {code} > new ValueStateDescriptor>( > NFA_OPERATOR_STATE_NAME, > new KryoSerializer>((Class>) (Class) NFA.class, > getExecutionConfig()), > null) > {code} > It seems Kryo does not invoke {{readObject}}/{{writeObject}}. We probably > need a custom {{TypeSerializer}} for {{NFA}} to solve the problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4149) Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
[ https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372960#comment-15372960 ] ASF GitHub Bot commented on FLINK-4149: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70449753 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); + } catch (IOException e) { + throw new RuntimeException("Could not serialize NFA.", e); + } + } + + @Override + public NFA deserialize(DataInputView source) throws IOException { + try { + int size = source.readInt(); + + byte[] data = new byte[size]; + + source.readFully(data); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; --- End diff -- Same here with the `DataInputViewStream`. > Fix Serialization of NFA in AbstractKeyedCEPPatternOperator > --- > > Key: FLINK-4149 > URL: https://issues.apache.org/jira/browse/FLINK-4149 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.1.0 > > > A job that uses CEP fails upon restore with a {{NullPointerException}} in > {{NFA.process()}}. The reason seems to be that field {{computationStates}} is > {{
[jira] [Commented] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372951#comment-15372951 ] Kostas Kloudas commented on FLINK-4199: --- Another related issue is that a similar misleading message is printed when the source fails or stops in the middle of the execution: {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 477429b836247909dd428a6cba5b923b. Waiting for job completion. 07/12/2016 16:18:52 Job execution switched to status RUNNING. 07/12/2016 16:18:52 Source: Socket Stream -> Sink: Unnamed(1/1) switched to SCHEDULED 07/12/2016 16:18:52 Source: Socket Stream -> Sink: Unnamed(1/1) switched to DEPLOYING 07/12/2016 16:18:52 Source: Socket Stream -> Sink: Unnamed(1/1) switched to RUNNING 07/12/2016 16:18:58 Source: Socket Stream -> Sink: Unnamed(1/1) switched to FINISHED 07/12/2016 16:18:58 Job execution switched to status FINISHED. Job has been submitted with JobID 477429b836247909dd428a6cba5b923b {code} The line "Job has been submitted with JobID 477429b836247909dd428a6cba5b923b" should be that the job XYZ finished. > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > {code} > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4149) Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
[ https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372963#comment-15372963 ] ASF GitHub Bot commented on FLINK-4149: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70450213 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); + } catch (IOException e) { + throw new RuntimeException("Could not serialize NFA.", e); + } + } + + @Override + public NFA deserialize(DataInputView source) throws IOException { + try { + int size = source.readInt(); + + byte[] data = new byte[size]; + + source.readFully(data); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not deserialize NFA.", e); + } } + + @Override + public NFA deserialize(NFA reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + target.write(source, size); + } + + @Override +
[GitHub] flink pull request #2202: [FLINK-4149] Fix Serialization of NFA in AbstractK...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70450213 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** +* {@link TypeSerializer} for {@link NFA} that uses Java Serialization. +*/ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); + } catch (IOException e) { + throw new RuntimeException("Could not serialize NFA.", e); + } + } + + @Override + public NFA deserialize(DataInputView source) throws IOException { + try { + int size = source.readInt(); + + byte[] data = new byte[size]; + + source.readFully(data); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not deserialize NFA.", e); + } } + + @Override + public NFA deserialize(NFA reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + target.write(source, size); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Serializer; --- End diff -- In order to make the equals relation symmetric, we should `return obj instanceof Serializer && ((Serializer) obj).canEqual(this)
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372973#comment-15372973 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 If there are no objections, I would like to merge this. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(Ob
[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 If there are no objections, I would like to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4067) Add version header to savepoints
[ https://issues.apache.org/jira/browse/FLINK-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372972#comment-15372972 ] ASF GitHub Bot commented on FLINK-4067: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2194 If there are no objections, I would like to merge this. > Add version header to savepoints > > > Key: FLINK-4067 > URL: https://issues.apache.org/jira/browse/FLINK-4067 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.1.0 > > > Adding a header with version information to savepoints ensures that we can > migrate savepoints between Flink versions in the future (for example when > changing internal serialization formats between versions). > After talking with Till, we propose to add the following meta data: > - Magic number (int): identify data as savepoint > - Version (int): savepoint version (independent of Flink version) > - Data Offset (int): specifies at which point the actual savepoint data > starts. With this, we can allow future Flink versions to add fields to the > header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink > 2.0. > For Flink 1.0 savepoint support, we have to try reading the savepoints > without a header before failing if we don't find the magic number. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2194: [FLINK-4067] [runtime] Add savepoint headers
Github user uce commented on the issue: https://github.com/apache/flink/pull/2194 If there are no objections, I would like to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---