[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry
[ https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903272#comment-15903272 ] Ismaël Mejía commented on FLINK-3026: - Great, taking a look right now, I will rebase my PR adding the fix I see there, can I push directly in the docker-flink repo ? > Publish the flink docker container to the docker registry > - > > Key: FLINK-3026 > URL: https://issues.apache.org/jira/browse/FLINK-3026 > Project: Flink > Issue Type: Task > Components: Build System, Docker >Reporter: Omer Katz >Assignee: Patrick Lucas > Labels: Deployment, Docker > > There's a dockerfile that can be used to build a docker container already in > the repository. It'd be awesome to just be able to pull it instead of > building it ourselves. > The dockerfile can be found at > https://github.com/apache/flink/tree/master/flink-contrib/docker-flink > It also doesn't point to the latest version of Flink which I fixed in > https://github.com/apache/flink/pull/1366 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService
[ https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903269#comment-15903269 ] Kostas Kloudas edited comment on FLINK-6007 at 3/9/17 4:04 PM: --- True. This way we do not have to checkpoint the {{deletionSet}}. Just as a "future" note. This introduces random accesses to the key set. So if we make the set of keys spillable to disk, then this can be a problem (as with the timers). Having an extra set that potentially fits in memory could be a good solution. But again, this is for the future. was (Author: kkl0u): True. This way we do not have to checkpoint the {{deletionSet}}. > ConcurrentModificationException in WatermarkCallbackService > --- > > Key: FLINK-6007 > URL: https://issues.apache.org/jira/browse/FLINK-6007 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > Currently, if an attempt is made to call > {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} > from within the {{OnWatermarkCallback}}, a > {{ConcurrentModificationException}} is thrown. The reason is that the > {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the > callback for each one of them. > To fix this, the deleted keys are put into a separate list, and the deletion > happens after the iteration over all keys has finished. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService
[ https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903293#comment-15903293 ] Kostas Kloudas commented on FLINK-6007: --- [~aljoscha] Now that I come to think about it, this solution assumes that 2 x noOfKeys fit in memory. The size of the deletion set is at most equal to the set of the registered keys. Given this, I am not sure if the second solution is the best > ConcurrentModificationException in WatermarkCallbackService > --- > > Key: FLINK-6007 > URL: https://issues.apache.org/jira/browse/FLINK-6007 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > Currently, if an attempt is made to call > {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} > from within the {{OnWatermarkCallback}}, a > {{ConcurrentModificationException}} is thrown. The reason is that the > {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the > callback for each one of them. > To fix this, the deleted keys are put into a separate list, and the deletion > happens after the iteration over all keys has finished. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry
[ https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903317#comment-15903317 ] Patrick Lucas commented on FLINK-3026: -- Yes, I gave you contributor permissions. This repo is really just a stop-gap until we find a permanent home for these scripts, such as in docker-library. I'm fairly convinced that these should be versioned independently of Flink (in a separate repo), as they are updated as a consequence of Flink releases and are not themselves part of a given release. Moreover, the scripts herein may relate to many different "stable" branches of Flink. And I don't really see how it would be feasible to make the contents of this repo somehow "downstream" from the Docker support included in apache/flink as I think they serve different purposes. In apache/flink, having a Dockerfile to package up the current tree would be useful for development, and as a baseline if a user wanted to create their own images from scratch. The docker-flink repo meanwhile encodes the generation of all Dockerfile variants for all "stable" versions of Flink. Finally, I think a tenet of this work (Docker support) going forward should be that the Dockerfile templates really should not change much over time. Instead, we should try to make any changes in functionality in Flink itself, and keep the Dockerfiles (and docker-entrypoint.sh scripts) as simple as possible. If we're doing things right, this code should very rarely change, except when we need to generate Dockerfiles for a new Flink release. > Publish the flink docker container to the docker registry > - > > Key: FLINK-3026 > URL: https://issues.apache.org/jira/browse/FLINK-3026 > Project: Flink > Issue Type: Task > Components: Build System, Docker >Reporter: Omer Katz >Assignee: Patrick Lucas > Labels: Deployment, Docker > > There's a dockerfile that can be used to build a docker container already in > the repository. It'd be awesome to just be able to pull it instead of > building it ourselves. > The dockerfile can be found at > https://github.com/apache/flink/tree/master/flink-contrib/docker-flink > It also doesn't point to the latest version of Flink which I fixed in > https://github.com/apache/flink/pull/1366 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3491: [FLINK-5804] [table] Add support for procTime non-partiti...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3491 merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5804) Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903325#comment-15903325 ] ASF GitHub Bot commented on FLINK-5804: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3491 merging > Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING > aggregation to SQL > --- > > Key: FLINK-5804 > URL: https://issues.apache.org/jira/browse/FLINK-5804 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - Since no PARTITION BY clause is specified, the execution will be single > threaded. > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5654) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3431: [FLINK-5910] [gelly] Framework for Gelly examples
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3431 @vasia, I was only thinking of this being used in gelly examples. The documentation for the use of the example drivers will be updated but I am anticipating that users will enclose new and existing algorithms with their own domain-specific code rather than adhere to the limitations of the examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5910) Framework for Gelly examples
[ https://issues.apache.org/jira/browse/FLINK-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903344#comment-15903344 ] ASF GitHub Bot commented on FLINK-5910: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3431 @vasia, I was only thinking of this being used in gelly examples. The documentation for the use of the example drivers will be updated but I am anticipating that users will enclose new and existing algorithms with their own domain-specific code rather than adhere to the limitations of the examples. > Framework for Gelly examples > > > Key: FLINK-5910 > URL: https://issues.apache.org/jira/browse/FLINK-5910 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > Driver jobs are composed of an input, an algorithm, and an output. Create the > interfaces for inputs, algorithms, and outputs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3500: [FLINK-6003] Add docker image based on the openjdk one (d...
Github user iemejia commented on the issue: https://github.com/apache/flink/pull/3500 Hey I probably should be more patient, I will wait until #3494 is merged and rebase from it. So closing for the moment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6003) Add docker image based on the openjdk one (debian+alpine)
[ https://issues.apache.org/jira/browse/FLINK-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903383#comment-15903383 ] ASF GitHub Bot commented on FLINK-6003: --- Github user iemejia commented on the issue: https://github.com/apache/flink/pull/3500 Hey I probably should be more patient, I will wait until #3494 is merged and rebase from it. So closing for the moment. > Add docker image based on the openjdk one (debian+alpine) > - > > Key: FLINK-6003 > URL: https://issues.apache.org/jira/browse/FLINK-6003 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > > The base docker image 'java' is deprecated since the end of the last year. > This issue will also re-introduce a Dockerfile for the debian-based openjdk > one in addition to the current one based on alpine. > Additional refactorings included: > - Move default version to 1.2.0 > - Refactor Dockerfile for consistency reasons (between both versions) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6003) Add docker image based on the openjdk one (debian+alpine)
[ https://issues.apache.org/jira/browse/FLINK-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903384#comment-15903384 ] ASF GitHub Bot commented on FLINK-6003: --- Github user iemejia closed the pull request at: https://github.com/apache/flink/pull/3500 > Add docker image based on the openjdk one (debian+alpine) > - > > Key: FLINK-6003 > URL: https://issues.apache.org/jira/browse/FLINK-6003 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > > The base docker image 'java' is deprecated since the end of the last year. > This issue will also re-introduce a Dockerfile for the debian-based openjdk > one in addition to the current one based on alpine. > Additional refactorings included: > - Move default version to 1.2.0 > - Refactor Dockerfile for consistency reasons (between both versions) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3500: [FLINK-6003] Add docker image based on the openjdk...
Github user iemejia closed the pull request at: https://github.com/apache/flink/pull/3500 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903423#comment-15903423 ] ASF GitHub Bot commented on FLINK-5874: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3501 [FLINK-5874] Restrict key types in the DataStream API. Rejects a type from being a key in `DataStream.keyBy()` if it is: 1. it is a POJO type but does not override the `hashCode()` and relies on the `Object.hashCode()` implementation. 2. it is an array of any type. This was also discussed with @fhueske R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink array-keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3501.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3501 commit bd00f9d37d5e3e1960b8bf1fff506293784b3039 Author: kl0u Date: 2017-03-08T11:11:07Z [FLINK-5874] Restrict key types in the DataStream API. Reject a type from being a key in keyBy() if it is: 1. it is a POJO type but does not override the hashCode() and relies on the Object.hashCode() implementation. 2. it is an array of any type. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3501 [FLINK-5874] Restrict key types in the DataStream API. Rejects a type from being a key in `DataStream.keyBy()` if it is: 1. it is a POJO type but does not override the `hashCode()` and relies on the `Object.hashCode()` implementation. 2. it is an array of any type. This was also discussed with @fhueske R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink array-keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3501.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3501 commit bd00f9d37d5e3e1960b8bf1fff506293784b3039 Author: kl0u Date: 2017-03-08T11:11:07Z [FLINK-5874] Restrict key types in the DataStream API. Reject a type from being a key in keyBy() if it is: 1. it is a POJO type but does not override the hashCode() and relies on the Object.hashCode() implementation. 2. it is an array of any type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6008) collection of BlobServer improvements
Nico Kruber created FLINK-6008: -- Summary: collection of BlobServer improvements Key: FLINK-6008 URL: https://issues.apache.org/jira/browse/FLINK-6008 Project: Flink Issue Type: Improvement Components: Network Affects Versions: 1.3.0 Reporter: Nico Kruber Assignee: Nico Kruber The following things should be removed around the BlobServer/BlobCache: * update config uptions with non-deprecated ones, e.g. {{high-availability.cluster-id}} and {{high-availability.storageDir}} * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs (prepares FLINK-4399] * remove {{NAME_ADDRESSABLE}} blobs after job/task termination * do not fail the {{BlobServer}} when a delete operation fails * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105225561 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903460#comment-15903460 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105225561 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3352: [FLINK-5838] [scripts] Print shell script usage
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3352 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5838) Print shell script usage
[ https://issues.apache.org/jira/browse/FLINK-5838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903509#comment-15903509 ] ASF GitHub Bot commented on FLINK-5838: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3352 > Print shell script usage > > > Key: FLINK-5838 > URL: https://issues.apache.org/jira/browse/FLINK-5838 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0 > > > {code} > $ ./bin/jobmanager.sh > Unknown daemon ''. Usage: flink-daemon.sh (start|stop|stop-all) > (jobmanager|taskmanager|zookeeper) [args]. > {code} > The arguments in {{jobmanager.sh}}'s call to {{flink-daemon.sh}} are > misaligned when {{$STARTSTOP}} is the null string. > {code} > "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}" > {code} > Same issue in {{taskmanager.sh}} and {{zookeeper.sh}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5838) Print shell script usage
[ https://issues.apache.org/jira/browse/FLINK-5838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-5838. - Resolution: Fixed Fixed in 3fcc4e37c282a07e09c19934e71c141beefd8073 > Print shell script usage > > > Key: FLINK-5838 > URL: https://issues.apache.org/jira/browse/FLINK-5838 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0 > > > {code} > $ ./bin/jobmanager.sh > Unknown daemon ''. Usage: flink-daemon.sh (start|stop|stop-all) > (jobmanager|taskmanager|zookeeper) [args]. > {code} > The arguments in {{jobmanager.sh}}'s call to {{flink-daemon.sh}} are > misaligned when {{$STARTSTOP}} is the null string. > {code} > "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}" > {code} > Same issue in {{taskmanager.sh}} and {{zookeeper.sh}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3491: [FLINK-5804] [table] Add support for procTime non-...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3491 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3402: [FLINK-5890] [gelly] GatherSumApply broken when object re...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3402 @StephanEwen, I updated the test to include the original test plus a new test with object reuse enabled. @vasia, would you be also be able to review this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903529#comment-15903529 ] ASF GitHub Bot commented on FLINK-5890: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3402 @StephanEwen, I updated the test to include the original test plus a new test with object reuse enabled. @vasia, would you be also be able to review this change? > GatherSumApply broken when object reuse enabled > --- > > Key: FLINK-5890 > URL: https://issues.apache.org/jira/browse/FLINK-5890 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in > the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to > swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in > {{ReduceDriver}} for the returned results). > {code} > @Override > public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws > Exception { > K key = arg0.f0; > M result = this.sumFunction.sum(arg0.f1, arg1.f1); > return new Tuple2<>(key, result); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5804) Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903527#comment-15903527 ] ASF GitHub Bot commented on FLINK-5804: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3491 > Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING > aggregation to SQL > --- > > Key: FLINK-5804 > URL: https://issues.apache.org/jira/browse/FLINK-5804 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - Since no PARTITION BY clause is specified, the execution will be single > threaded. > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5654) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5804) Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5804. Resolution: Implemented Fix Version/s: 1.3.0 Implemented with 7456d78d271b217c80d46e24029c55741807e51d > Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING > aggregation to SQL > --- > > Key: FLINK-5804 > URL: https://issues.apache.org/jira/browse/FLINK-5804 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - Since no PARTITION BY clause is specified, the execution will be single > threaded. > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5654) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3389: [FLINK-5881] [table] ScalarFunction(UDF) should support v...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Hi @twalthr . I've updated the patch as your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments
[ https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903546#comment-15903546 ] ASF GitHub Bot commented on FLINK-5881: --- Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Hi @twalthr . I've updated the patch as your comments. > ScalarFunction(UDF) should support variable types and variable arguments > - > > Key: FLINK-5881 > URL: https://issues.apache.org/jira/browse/FLINK-5881 > Project: Flink > Issue Type: Sub-task >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > As a sub-task of FLINK-5826. We would like to support the ScalarFunction > first and make the review a little bit easier. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105235710 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- Thanks @kl0u Good catch! I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events. @aljoscha If that is redundant check, we might just remove `isLate`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903561#comment-15903561 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105235710 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- Thanks @kl0u Good catch! I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events. @aljoscha If that is redundant check, we might just remove `isLate`. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105238764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -870,6 +870,16 @@ class CodeGenerator( requireTimeInterval(operand) generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand) + case IN => +val left = operands.head +val right = operands.tail +val addReusableCodeCallback = (declaration: String, initialization: String) => { --- End diff -- @twalthr `generateIn` can executed long enough. This solution works fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903586#comment-15903586 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105238764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -870,6 +870,16 @@ class CodeGenerator( requireTimeInterval(operand) generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand) + case IN => +val left = operands.head +val right = operands.tail +val addReusableCodeCallback = (declaration: String, initialization: String) => { --- End diff -- @twalthr `generateIn` can executed long enough. This solution works fine > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105241841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- @twalthr I have added tests for this case in new PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903615#comment-15903615 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105241841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- @twalthr I have added tests for this case in new PR > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239223 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -736,7 +736,7 @@ public void restoreState(List state) throws Exception { static final ValueStateDescriptor descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false); private static final long serialVersionUID = 1L; - private ValueState operatorState; + private transient ValueState operatorState; --- End diff -- unrelated change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241140 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = --- End diff -- Do we need a separate class here or could we just throw in an Object[]? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240510 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- we should also simply break this if statement down into multiple blocks. i.e. ``` if (type isntanceof PojoTypeInfo) { return //find hashCode } if (tpye instance off XArrayTypeInfo ... ) { return false; } return true; ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? + !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : --- End diff -- this method would be more readable by not inverting here, and returning true as the default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241562 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105242176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( --- End diff -- The focus of this test appears to be to verify that you can use an array as a key by wrapping it in a Pojo that implements HashCode. (based on the naming). We should probably focus more on the Pojo-with-hashCode i
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241450 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239060 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); --- End diff -- First time I'm seeing this pattern and i can't help but feel that a simple try-catch block would be more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239799 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- ? and : are typically at the start of a new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240936 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: --- End diff -- I would shorten this to read ```returns true if the type overrides the hashcode implementation```. The details can be container in the general javadoc of the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238197 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { --- End diff -- type: Arround -> Around --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241262 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241358 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exce
[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239687 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); --- End diff -- Let's include ```typeInfo``` in this exception as well to narrow it down for the user. We may even want to delay the exception until we scanned the entire type and report all invalid keys at once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903627#comment-15903627 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: --- End diff -- I would shorten this to read ```returns true if the type overrides the hashcode implementation```. The details can be container in the general javadoc of the method. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903639#comment-15903639 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238197 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { --- End diff -- type: Arround -> Around > Reject arrays as keys in DataStream API to avoid inconsistent hashing > --
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903625#comment-15903625 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? + !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : --- End diff -- this method would be more readable by not inverting here, and returning true as the default. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903637#comment-15903637 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105242176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903631#comment-15903631 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241140 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = --- End diff -- Do we need a separate class here or could we just throw in an Object[]? > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903635#comment-15903635 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241450 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903628#comment-15903628 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239060 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); --- End diff -- First time I'm seeing this pattern and i can't help but feel that a simple try-catch block would be more readable. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903633#comment-15903633 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239223 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java --- @@ -736,7 +736,7 @@ public void restoreState(List state) throws Exception { static final ValueStateDescriptor descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false); private static final long serialVersionUID = 1L; - private ValueState operatorState; + private transient ValueState operatorState; --- End diff -- unrelated change. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903638#comment-15903638 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239799 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- ? and : are typically at the start of a new line. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903634#comment-15903634 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241262 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903630#comment-15903630 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241358 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903626#comment-15903626 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903629#comment-15903629 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241562 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903632#comment-15903632 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240510 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** +* Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be +* used as a key in the {@code DataStream.keyBy()} operation. +* +* @return {@code false} if: +* +* it is a POJO type but does not override the {@link #hashCode()} method and relies on +* the {@link Object#hashCode()} implementation. +* it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, +* {@link ObjectArrayTypeInfo}). +* , +* {@code true} otherwise. +*/ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) ? --- End diff -- we should also simply break this if statement down into multiple blocks. i.e. ``` if (type isntanceof PojoTypeInfo) { return //find hashCode } if (tpye instance off XArrayTypeInfo ... ) { return false; } return true; ``` > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903624#comment-15903624 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105239687 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream dataStream, KeySelector keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); --- End diff -- Let's include ```typeInfo``` in this exception as well to narrow it down for the user. We may even want to delay the exception until we scanned the entire type and report all invalid keys at once. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903636#comment-15903636 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240936 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } / + // KeyBy testing + / + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElemen
[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3501 This should work and is well tested, good job. Had a bunch of minor comments, but nothing critical. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903645#comment-15903645 ] ASF GitHub Bot commented on FLINK-5874: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3501 This should work and is well tested, good job. Had a bunch of minor comments, but nothing critical. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > - > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4 >Reporter: Robert Metzger >Assignee: Kostas Kloudas >Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903654#comment-15903654 ] ASF GitHub Bot commented on FLINK-4565: --- Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105244663 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Unfortunately, add support for POJO and tuples as a standard types of data in table API is too complicated task and out of scope this task. Adding of new composite types to API more wide than add IN operator, it will impact all process of validation and execution of SQL statements. It would be better create epic for introduce new types. I have researched it and I think I would be better commit IN operator functionality without support tuples and POJOs. Otherwise, It would become infinite task. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105244663 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Unfortunately, add support for POJO and tuples as a standard types of data in table API is too complicated task and out of scope this task. Adding of new composite types to API more wide than add IN operator, it will impact all process of validation and execution of SQL statements. It would be better create epic for introduce new types. I have researched it and I think I would be better commit IN operator functionality without support tuples and POJOs. Otherwise, It would become infinite task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903759#comment-15903759 ] ASF GitHub Bot commented on FLINK-4565: --- GitHub user DmytroShkvyra opened a pull request: https://github.com/apache/flink/pull/3502 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3502 commit f7960d66a0f885a5b032345427c5380f268cc60e Author: DmytroShkvyra Date: 2017-03-09T19:37:46Z [FLINK-4565] Support for SQL IN operator > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dmytro Shkvyra > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator
GitHub user DmytroShkvyra opened a pull request: https://github.com/apache/flink/pull/3502 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmytroShkvyra/flink FLINK-4565-NV Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3502 commit f7960d66a0f885a5b032345427c5380f268cc60e Author: DmytroShkvyra Date: 2017-03-09T19:37:46Z [FLINK-4565] Support for SQL IN operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6009) Deprecate DataSetUtils#checksumHashCode
Greg Hogan created FLINK-6009: - Summary: Deprecate DataSetUtils#checksumHashCode Key: FLINK-6009 URL: https://issues.apache.org/jira/browse/FLINK-6009 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.3.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.3.0 This is likely only used by Gelly and we have a more featureful implementation allowing for multiple outputs and setting the job name. Deprecation will allow this to be removed in Flink 2.0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6010) Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section
Bowen Li created FLINK-6010: --- Summary: Documentation: correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section Key: FLINK-6010 URL: https://issues.apache.org/jira/browse/FLINK-6010 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Bowen Li Fix For: 1.2.1 In https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/ide_setup.html#installing-the-scala-plugin, how you should get to 'plugins' page in IntelliJ IDEA is wrong. This seems to be describing a much older version of IntelliJ IDE. The correct path now is: IntelliJ IDEA -> Preferences -> Plugins -> Install JetBrains plugin I'll submit a PR to fix this -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haohui Mai reassigned FLINK-5829: - Assignee: Haohui Mai > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
Haohui Mai created FLINK-6011: - Summary: Support TUMBLE, HOP, SESSION window in streaming SQL Key: FLINK-6011 URL: https://issues.apache.org/jira/browse/FLINK-6011 Project: Flink Issue Type: Sub-task Reporter: Haohui Mai Assignee: Haohui Mai CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / {{HOP}} / {{SESSION}} windows in the parser. This jira tracks the efforts of adding the corresponding supports on the planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6012) Support WindowStart / WindowEnd functions in stream SQL
Haohui Mai created FLINK-6012: - Summary: Support WindowStart / WindowEnd functions in stream SQL Key: FLINK-6012 URL: https://issues.apache.org/jira/browse/FLINK-6012 Project: Flink Issue Type: Sub-task Reporter: Haohui Mai Assignee: Haohui Mai This jira proposes to add support for {{TUMBLE_START()}} / {{TUMBLE_END()}} / {{HOP_START()}} / {{HOP_END()}} / {{SESSUIB_START()}} / {{SESSION_END()}} in the planner in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...
Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3461 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904042#comment-15904042 ] ASF GitHub Bot commented on FLINK-5954: --- Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3461 > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3461 Sorry for the late response. This is a prerequisite step for FLINK-6012 -- translating the group auxiliary functions (e.g., `TUMBLE_START`) to the corresponding Flink expressions (e.g., `WindowStart`). A more detailed description is on the FLINK-5954. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904048#comment-15904048 ] ASF GitHub Bot commented on FLINK-5954: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3461 Sorry for the late response. This is a prerequisite step for FLINK-6012 -- translating the group auxiliary functions (e.g., `TUMBLE_START`) to the corresponding Flink expressions (e.g., `WindowStart`). A more detailed description is on the FLINK-5954. > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904050#comment-15904050 ] ASF GitHub Bot commented on FLINK-5954: --- GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3461 [FLINK-5954] Always assign names to the window in the Stream SQL API. Please see jira for more details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5954 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3461 commit 064b827127b15a1397c216aae6611d575a75556b Author: Haohui Mai Date: 2017-03-09T21:57:49Z [FLINK-5954] Always assign names to the window in the Stream SQL API. > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...
GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3461 [FLINK-5954] Always assign names to the window in the Stream SQL API. Please see jira for more details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5954 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3461 commit 064b827127b15a1397c216aae6611d575a75556b Author: Haohui Mai Date: 2017-03-09T21:57:49Z [FLINK-5954] Always assign names to the window in the Stream SQL API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904089#comment-15904089 ] ASF GitHub Bot commented on FLINK-3257: --- Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3461: [FLINK-5954] Always assign names to the window in the Str...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3461 Hi, @haohui Thanks for your explanation, and I'll see the detail in the FLINK-5954. Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904189#comment-15904189 ] ASF GitHub Bot commented on FLINK-5954: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3461 Hi, @haohui Thanks for your explanation, and I'll see the detail in the FLINK-5954. Thanks, SunJincheng > Always assign names to the window in the Stream SQL API > --- > > Key: FLINK-5954 > URL: https://issues.apache.org/jira/browse/FLINK-5954 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, > {{SESSION}} grouped windows, as well as the corresponding auxiliary functions > that allow uses to query the start and the end of the windows (e.g., > {{TUMBLE_START()}} and {{TUMBLE_END()}} see > http://calcite.apache.org/docs/stream.html for more details). > The goal of this jira is to add support for these auxiliary functions in > Flink. Flink already has runtime supports for them, as these functions are > essential mapped to the {{WindowStart}} and {{WindowEnd}} classes. > To implement this feature in transformation, the transformation needs to > recognize these functions and map them to the {{WindowStart}} and > {{WindowEnd}} classes. > The problem is that both classes can only refer to the windows using alias. > Therefore this jira proposes to assign a unique name for each window to > enable the transformation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6013) Add Datadog HTTP metrics reporter
Bowen Li created FLINK-6013: --- Summary: Add Datadog HTTP metrics reporter Key: FLINK-6013 URL: https://issues.apache.org/jira/browse/FLINK-6013 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.2.0 Reporter: Bowen Li Priority: Critical Fix For: 1.2.1 We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a lot other companies also do. Flink right now only has a StatsD metrics reporter, and users have to set up Datadog Agent in order to receive metrics from StatsD and transport them to Datadog. We don't like this approach. We prefer to have a Datadog metrics reporter directly contacting Datadog http endpoint. I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3406 Hi @fhueske , i like your propose about moving the annotation from `TableSource` to `TableSourceConverter`. Lets do it this way. BTW, i noticed that you offered three possible methods to the `TableSourceConverter`, i can only imagine `def requiredProperties: Array[String] ` is necessary for now. It can help validating the converter and to decide which converter we should use when multiple converters have the same `TableType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105316858 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.Public; +import org.apache.flink.table.catalog.TableSourceConverter; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A tableSource with this annotation represents it is compatible with external catalog, that is, + * an instance of this tableSource can be converted to or converted from external catalog table + * instance. + * The annotation contains the following information: + * + * external catalog table type name for this kind of tableSource + * external catalog table <-> tableSource converter class + * + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Public +public @interface ExternalCatalogCompatible { --- End diff -- This suggestion is pretty good, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests
Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 OK, get it. So how about just do it now in flink-tests like this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test
[ https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904258#comment-15904258 ] ASF GitHub Bot commented on FLINK-5976: --- Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 OK, get it. So how about just do it now in flink-tests like this pull request? > Refactoring duplicate Tokenizer in flink-test > - > > Key: FLINK-5976 > URL: https://issues.apache.org/jira/browse/FLINK-5976 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.2.0 >Reporter: liuyuzhong7 >Priority: Minor > Labels: test > Fix For: 1.2.0 > > > There are some duplicate code like this in flink-test, I think refactor this > will be better. > ``` > public final class Tokenizer implements FlatMapFunction Tuple2> { > @Override > public void flatMap(String value, Collector> > out) { > // normalize and split the line > String[] tokens = value.toLowerCase().split("\\W+"); > // emit the pairs > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2(token, > 1)); > } > } > } > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints
Xiaogang Shi created FLINK-6014: --- Summary: Allow the registration of state objects in checkpoints Key: FLINK-6014 URL: https://issues.apache.org/jira/browse/FLINK-6014 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi This issue is the very first step towards incremental checkpointing. We introduce a new state handle named {{CompositeStateHandle}} to be the base of the snapshots taken by task components. Known implementation may include {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. It should register all its state objects in {{StateRegistry}} when its checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending checkpoint completes or a complete checkpoint is reloaded in the recovery). When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, we should not simply discard all state objects in the checkpoint. With the introduction of incremental checkpointing, a {{StateObject}} may be referenced by different checkpoints. We should unregister all the state objects contained in the {{StateRegistry}} first. Only those state objects that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904271#comment-15904271 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], i can continue working on {{FilterableTableSource}}. Should i open another PR based on the former changes? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test
[ https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904280#comment-15904280 ] ASF GitHub Bot commented on FLINK-5976: --- Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 @StephanEwen FlatMapFunction "Tokenizer" has move to org.apache.flink.test.testfunctions. Help me to review this. Thanks. And collect other reusable functions to org.apache.flink.test.testfunctions later. > Refactoring duplicate Tokenizer in flink-test > - > > Key: FLINK-5976 > URL: https://issues.apache.org/jira/browse/FLINK-5976 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 1.2.0 >Reporter: liuyuzhong7 >Priority: Minor > Labels: test > Fix For: 1.2.0 > > > There are some duplicate code like this in flink-test, I think refactor this > will be better. > ``` > public final class Tokenizer implements FlatMapFunction Tuple2> { > @Override > public void flatMap(String value, Collector> > out) { > // normalize and split the line > String[] tokens = value.toLowerCase().split("\\W+"); > // emit the pairs > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2(token, > 1)); > } > } > } > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests
Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3485 @StephanEwen FlatMapFunction "Tokenizer" has move to org.apache.flink.test.testfunctions. Help me to review this. Thanks. And collect other reusable functions to org.apache.flink.test.testfunctions later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-3849: - Assignee: Kurt Young > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3395 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3395 Hi Till, I almost miss this comments! I didn't see it until a few minutes ago. I fully understand your concern. Just let me explain more about your comments. 1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger. 2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong. 3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components. Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904311#comment-15904311 ] ASF GitHub Bot commented on FLINK-5861: --- Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3395 > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > Fix For: 1.3.0 > > > Some components in TaskManager, such as TaskManagerActions, > CheckpointResponder, ResultPartitionConsumableNotifier, > PartitionProducerStateChecker, need to support updating JobManagerConnection. > So when JobManager fails and recovers, the tasks who keep old > JobManagerConnection can be notified to update JobManagerConnection. The > tasks can continue doing their jobs without failure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904310#comment-15904310 ] ASF GitHub Bot commented on FLINK-5861: --- Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3395 Hi Till, I almost miss this comments! I didn't see it until a few minutes ago. I fully understand your concern. Just let me explain more about your comments. 1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger. 2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong. 3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components. Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > Fix For: 1.3.0 > > > Some components in TaskManager, such as TaskManagerActions, > CheckpointResponder, ResultPartitionConsumableNotifier, > PartitionProducerStateChecker, need to support updating JobManagerConnection. > So when JobManager fails and recovers, the tasks who keep old > JobManagerConnection can be notified to update JobManagerConnection. The > tasks can continue doing their jobs without failure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)