[GitHub] jparkie opened a new pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-02 Thread GitBox
jparkie opened a new pull request #6782: [FLINK-9083][Cassandra Connector] Add 
async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782
 
 
   ## What is the purpose of the change
   
   This pull requests rewrites `CassandraSinkBase` to use a `Phaser` and 
`Semaphore` to provide proper synchronization to support 
`maxConcurrentRequests` as a new configuration. This improves the reliability 
of the Cassandra Connector as it can currently overwhelm a weak Cassandra 
cluster if the upstream source has very high throughput.
   
   ## Brief change log
   
 - Rewrote `CassandraSinkBase` to use a `Phaser` and `Semaphore`.
 - Expose `public void setMaxConcurrentRequests(int maxConcurrentRequests, 
long timeout, TimeUnit unit)` on `CassandraSinkBase`.
 - Modify `CassandraSink` with the new configuration. It currently does not 
support the WAL.
 - Updated the documentation about the new configuration.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`CassandraSinkBaseTest` and `CassandraConnectorITCase`.
   
   This change added tests and can be verified as follows:
   
 - Added tests for acquiring permits from the `Semaphore` and releasing 
permits from the `Semaphore` when a write succeeds or fails.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs / JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9083:
--
Labels: pull-request-available  (was: )

> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635087#comment-16635087
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie opened a new pull request #6782: [FLINK-9083][Cassandra Connector] Add 
async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782
 
 
   ## What is the purpose of the change
   
   This pull requests rewrites `CassandraSinkBase` to use a `Phaser` and 
`Semaphore` to provide proper synchronization to support 
`maxConcurrentRequests` as a new configuration. This improves the reliability 
of the Cassandra Connector as it can currently overwhelm a weak Cassandra 
cluster if the upstream source has very high throughput.
   
   ## Brief change log
   
 - Rewrote `CassandraSinkBase` to use a `Phaser` and `Semaphore`.
 - Expose `public void setMaxConcurrentRequests(int maxConcurrentRequests, 
long timeout, TimeUnit unit)` on `CassandraSinkBase`.
 - Modify `CassandraSink` with the new configuration. It currently does not 
support the WAL.
 - Updated the documentation about the new configuration.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`CassandraSinkBaseTest` and `CassandraConnectorITCase`.
   
   This change added tests and can be verified as follows:
   
 - Added tests for acquiring permits from the `Semaphore` and releasing 
permits from the `Semaphore` when a write succeeds or fails.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs / JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10450) Broken links in the documentation

2018-10-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-10450:


Assignee: Chesnay Schepler

> Broken links in the documentation
> -
>
> Key: FLINK-10450
> URL: https://issues.apache.org/jira/browse/FLINK-10450
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Project Website
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> {code}
> [2018-09-27 09:57:51] ERROR `/flinkdev/building.html' not found.
> [2018-09-27 09:57:51] ERROR `/dev/stream/dataset_transformations.html' not 
> found.
> [2018-09-27 09:57:51] ERROR `/dev/stream/windows.html' not found.
> ---
> Found 3 broken links.
> Search for page containing broken link using 'grep -R BROKEN_PATH DOCS_DIR'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10474) Don't translate IN to JOIN with VALUES for streaming queries

2018-10-02 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635096#comment-16635096
 ] 

Fabian Hueske commented on FLINK-10474:
---

[~walterddr] Evaluating an IN clause with many elements with a JOIN can be more 
efficient than a cascade of many disjunctive predicates. However, if relations 
are treated as dynamic (which is the case for the regular stream join), this is 
obviously not a good idea.

> Don't translate IN to JOIN with VALUES for streaming queries
> 
>
> Key: FLINK-10474
> URL: https://issues.apache.org/jira/browse/FLINK-10474
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
>
> IN clauses are translated to JOIN with VALUES if the number of elements in 
> the IN clause exceeds a certain threshold. This should not be done, because a 
> streaming join is very heavy and materializes both inputs (which is fine for 
> the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aai95 opened a new pull request #6783: [FLINK-10472] [table] Add CBRT math function supported in Table API and SQL

2018-10-02 Thread GitBox
aai95 opened a new pull request #6783: [FLINK-10472] [table] Add CBRT math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6783
 
 
   ## What is the purpose of the change
   
   *This pull request adds `Cbrt` math function supported in Table API and SQL.*
   
   ## Brief change log
   
 - *Added `Cbrt` math function supported in Table API and SQL.*
 - *Added `ScalarFunctionsTest#testCbrt`.*
   
   ## Verifying this change
   *This change added test `ScalarFunctionsTest#testCbrt`.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10472) Add CBRT math function supported in Table API and SQL

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635107#comment-16635107
 ] 

ASF GitHub Bot commented on FLINK-10472:


aai95 opened a new pull request #6783: [FLINK-10472] [table] Add CBRT math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6783
 
 
   ## What is the purpose of the change
   
   *This pull request adds `Cbrt` math function supported in Table API and SQL.*
   
   ## Brief change log
   
 - *Added `Cbrt` math function supported in Table API and SQL.*
 - *Added `ScalarFunctionsTest#testCbrt`.*
   
   ## Verifying this change
   *This change added test `ScalarFunctionsTest#testCbrt`.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add CBRT math function supported in Table API and SQL
> -
>
> Key: FLINK-10472
> URL: https://issues.apache.org/jira/browse/FLINK-10472
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Aleksei Izmalkin
>Priority: Minor
>  Labels: pull-request-available
>
> Implement the function to calculate the cube root.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-02 Thread GitBox
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-426186194
 
 
   Excellent! 👌 
   
   I'm currently working on adding support for Scala 2.12, and this PR is a 
prerequisite for that, since it makes it easier to remove older Kafka 
connectors from the Scala 2.12 build. The reason for this is that there are no 
Scala 2.12 dependencies for Kafka version < 0.10.2.x


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635126#comment-16635126
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-426186194
 
 
   Excellent! 👌 
   
   I'm currently working on adding support for Scala 2.12, and this PR is a 
prerequisite for that, since it makes it easier to remove older Kafka 
connectors from the Scala 2.12 build. The reason for this is that there are no 
Scala 2.12 dependencies for Kafka version < 0.10.2.x


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10477) Add rescale button to web interface

2018-10-02 Thread Sander Ploegsma (JIRA)
Sander Ploegsma created FLINK-10477:
---

 Summary: Add rescale button to web interface
 Key: FLINK-10477
 URL: https://issues.apache.org/jira/browse/FLINK-10477
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Sander Ploegsma


Instead of having to use the REST API to rescale a running job, it would be 
much easier if we were able to rescale a job from the web interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha opened a new pull request #6784: [FLINK-7811] Add support for Scala 2.12

2018-10-02 Thread GitBox
aljoscha opened a new pull request #6784: [FLINK-7811] Add support for Scala 
2.12
URL: https://github.com/apache/flink/pull/6784
 
 
   This is the first batch of changes on the way to Scala 2.12 support. All 
these changes are necessary to make the Scala 2.12 compiler happy and I also 
updated the `ClosureCleaner` to be able to work with Scala 2.12.
   
   This does not yet change the Scala version to 2.12 but I have a 
work-in-progress branch that does that and these changes are the first batch of 
changes that are valid both for Scala 2.11 and necessary for Scala 2.12.
   
   This adds a dependency:
   ```
   
   
   org.apache.xbean
   xbean-asm6-shaded
   4.8
   
   ```
   
   It's required to make the ClosureCleaner work with Scala 2.12 lambdas. It's 
convenient because it's already there but we can also make our own 
`flink-shaded` release of ASM 6.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-7811) Add support for Scala 2.12

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7811:
--
Labels: pull-request-available  (was: )

> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635153#comment-16635153
 ] 

ASF GitHub Bot commented on FLINK-7811:
---

aljoscha opened a new pull request #6784: [FLINK-7811] Add support for Scala 
2.12
URL: https://github.com/apache/flink/pull/6784
 
 
   This is the first batch of changes on the way to Scala 2.12 support. All 
these changes are necessary to make the Scala 2.12 compiler happy and I also 
updated the `ClosureCleaner` to be able to work with Scala 2.12.
   
   This does not yet change the Scala version to 2.12 but I have a 
work-in-progress branch that does that and these changes are the first batch of 
changes that are valid both for Scala 2.11 and necessary for Scala 2.12.
   
   This adds a dependency:
   ```
   
   
   org.apache.xbean
   xbean-asm6-shaded
   4.8
   
   ```
   
   It's required to make the ClosureCleaner work with Scala 2.12 lambdas. It's 
convenient because it's already there but we can also make our own 
`flink-shaded` release of ASM 6.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID

2018-10-02 Thread Obi Tetsuya (JIRA)
Obi Tetsuya created FLINK-10478:
---

 Summary: Kafka Producer wrongly formats "%" for transaction ID
 Key: FLINK-10478
 URL: https://issues.apache.org/jira/browse/FLINK-10478
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
 Environment: Flink 1.4.2

Scala 2.11.12

jdk1.8.0_162

Running on local embedded Flink mini cluster (This happened on a standalone 
cluster with another code)
Reporter: Obi Tetsuya


Kafka Producer with exactly-once feature uses its task name for a transaction 
ID. Because the Producer uses the name as a format string directly, in the case 
it contains "%" the job fails.

Code to reproduce:
{code:scala}
object ExampleRunner {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.enableCheckpointing(1000)
env.getConfig.disableSysoutLogging()
env.setRestartStrategy(RestartStrategies.noRestart)

val p = new java.util.Properties
Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" 
-> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) }

env
  .fromCollection(100 to 200)
  .map(_.toString)
  .addSink(new FlinkKafkaProducer011(
"test",
new KeyedSerializationSchemaWrapper(new SimpleStringSchema),
p,
Semantic.EXACTLY_ONCE)).name("100%")
env.execute()
  }
}
{code}

Raised exception:
{code}
2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO  
o.a.flink.runtime.taskmanager.Task  - Map -> Sink: 100% (1/8) 
(25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED.
java.util.MissingFormatWidthException: %-%
at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040)
at java.util.Formatter$FormatSpecifier.(Formatter.java:2733)
at java.util.Formatter.parse(Formatter.java:2560)
at java.util.Formatter.format(Formatter.java:2501)
at java.util.Formatter.format(Formatter.java:2455)
at java.lang.String.format(String.java:2940)
at 
org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91)
at 
org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-1003) Spread out scheduling strategy

2018-10-02 Thread Gyula Fora (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-1003.
-
Resolution: Won't Fix

> Spread out scheduling strategy
> --
>
> Key: FLINK-1003
> URL: https://issues.apache.org/jira/browse/FLINK-1003
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Gyula Fora
>Priority: Major
>
> Currently the Flink scheduler tries to fill one instance completely before 
> the tasks are deployed to another instance. This is a good behaviour in 
> multi-user and multi-job scenarios but it wastes resources if one wants to 
> use the complete cluster. Therefore, another scheduling strategy where the 
> load among the different instances is kept balanced might be useful. This 
> spread out strategy will deploy the tasks such that the overall work is 
> equally distributed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-02 Thread Gyula Fora (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635168#comment-16635168
 ] 

Gyula Fora commented on FLINK-9635:
---

Should we consider this issue a blocker? I know the proper fix is very hard and 
a lot of effort but the current state is very unsafe as well.

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-10469:

Priority: Critical  (was: Major)

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Critical
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635176#comment-16635176
 ] 

Nico Kruber commented on FLINK-10469:
-

That's actually a very critical bug you discovered and may explain one or two 
things I saw in the field. How did you discover it is related to the 
{{FileChannel.write(ByteBuffer)}} and that adding a loop helps?

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Critical
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-10469:

Fix Version/s: 1.5.5
   1.6.2
   1.7.0

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Critical
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-10469:

Component/s: Network

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Network
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Critical
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #6430: [FLINK-8058][Queryable State]Queryable state should check types

2018-10-02 Thread GitBox
azagrebin commented on issue #6430: [FLINK-8058][Queryable State]Queryable 
state should check types
URL: https://github.com/apache/flink/pull/6430#issuecomment-426203590
 
 
   hi @klion26, do I understand correctly that the idea now is that the state 
descriptor is serialised and sent for every request? If so it is probably too 
expensive to do it every time. The check could happen only once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8058) Queryable state should check types

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635178#comment-16635178
 ] 

ASF GitHub Bot commented on FLINK-8058:
---

azagrebin commented on issue #6430: [FLINK-8058][Queryable State]Queryable 
state should check types
URL: https://github.com/apache/flink/pull/6430#issuecomment-426203590
 
 
   hi @klion26, do I understand correctly that the idea now is that the state 
descriptor is serialised and sent for every request? If so it is probably too 
expensive to do it every time. The check could happen only once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Queryable state should check types
> --
>
> Key: FLINK-8058
> URL: https://issues.apache.org/jira/browse/FLINK-8058
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> The queryable state currently doesn't do any type checks on the client or 
> server and generally relies on serializers to catch errors.
> Neither the type of state is checked (ValueState, ListState etc.) nor the 
> type of contained values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-10469:
---
Priority: Blocker  (was: Critical)

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Network
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-02 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635180#comment-16635180
 ] 

Piotr Nowojski commented on FLINK-10469:


Good find [~gaoyunhaii]. Can you fix it within next 2 weeks so that this can 
make it to 1.7.0?

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Network
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10450) Broken links in the documentation

2018-10-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10450.

Resolution: Fixed

master: 168a6a1fd25cf25c7bef5d1c60cef24bfb867cb5

> Broken links in the documentation
> -
>
> Key: FLINK-10450
> URL: https://issues.apache.org/jira/browse/FLINK-10450
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Project Website
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> {code}
> [2018-09-27 09:57:51] ERROR `/flinkdev/building.html' not found.
> [2018-09-27 09:57:51] ERROR `/dev/stream/dataset_transformations.html' not 
> found.
> [2018-09-27 09:57:51] ERROR `/dev/stream/windows.html' not found.
> ---
> Found 3 broken links.
> Search for page containing broken link using 'grep -R BROKEN_PATH DOCS_DIR'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221880693
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
+   sink.open(new Configuration());
+
+   List 
customCassandraAnnotatedPojos = IntStream.range(0, 20)
+   .mapToObj(x -> new 
CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
+   .collect(Collectors.toList());
+
+   customCassandraAnnotatedPojos.forEach(sink::send);
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"), builder, CustomCassandraAnnotatedPojo.class);
 
 Review comment:
   We've had enough issues in the past with writes not showing up immediately 
to warrant such safeguards.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635192#comment-16635192
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221880693
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
+   sink.open(new Configuration());
+
+   List 
customCassandraAnnotatedPojos = IntStream.range(0, 20)
+   .mapToObj(x -> new 
CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
+   .collect(Collectors.toList());
+
+   customCassandraAnnotatedPojos.forEach(sink::send);
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"), builder, CustomCassandraAnnotatedPojo.class);
 
 Review comment:
   We've had enough issues in the past with writes not showing up immediately 
to warrant such safeguards.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221881385
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
+   sink.open(new Configuration());
+
+   List 
customCassandraAnnotatedPojos = IntStream.range(0, 20)
+   .mapToObj(x -> new 
CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
+   .collect(Collectors.toList());
+
+   customCassandraAnnotatedPojos.forEach(sink::send);
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"), builder, CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(null);
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
+   
result.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
+   
customCassandraAnnotatedPojos.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
+
+   for (int i = 0; i < result.size(); i++) {
+   assertThat(result.get(i), 
samePropertyValuesAs(customCassandraAnnotatedPojos.get(i)));
 
 Review comment:
   its fine to sort them, but you don't need to iterate over the lists.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221882439
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
 ##
 @@ -41,7 +41,7 @@
  * @param  Type of the elements to write.
  */
 public abstract class CassandraOutputFormatBase extends 
RichOutputFormat {
-   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormatBase.class);
+   private final Logger logger = LoggerFactory.getLogger(getClass());
 
 Review comment:
   Please revert changes to this class; this PR is for the InputFormat and 
shouldn't touch other classes unless absolutely necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221881036
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
 
 Review comment:
   👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221883320
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
 
 Review comment:
   Let's keep it for now but update it once the outputformat is in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221882020
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -382,31 +383,31 @@ public void testCassandraCommitter() throws Exception {
@Test
public void testCassandraTupleAtLeastOnceSink() throws Exception {
CassandraTupleSink> sink = new 
CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
-
-   sink.open(new Configuration());
-
-   for (Tuple3 value : collection) {
-   sink.send(value);
+   try {
 
 Review comment:
   please limit your changes to your sink, or move these into a separate commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635202#comment-16635202
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221882020
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -382,31 +383,31 @@ public void testCassandraCommitter() throws Exception {
@Test
public void testCassandraTupleAtLeastOnceSink() throws Exception {
CassandraTupleSink> sink = new 
CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
-
-   sink.open(new Configuration());
-
-   for (Tuple3 value : collection) {
-   sink.send(value);
+   try {
 
 Review comment:
   please limit your changes to your sink, or move these into a separate commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635200#comment-16635200
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221881036
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
 
 Review comment:
   👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635201#comment-16635201
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221881385
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
+   sink.open(new Configuration());
+
+   List 
customCassandraAnnotatedPojos = IntStream.range(0, 20)
+   .mapToObj(x -> new 
CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
+   .collect(Collectors.toList());
+
+   customCassandraAnnotatedPojos.forEach(sink::send);
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"), builder, CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(null);
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
+   
result.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
+   
customCassandraAnnotatedPojos.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
+
+   for (int i = 0; i < result.size(); i++) {
+   assertThat(result.get(i), 
samePropertyValuesAs(customCassandraAnnotatedPojos.get(i)));
 
 Review comment:
   its fine to sort them, but you don't need to iterate over the lists.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635203#comment-16635203
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221883320
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +477,41 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"batches"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
 
 Review comment:
   Let's keep it for now but update it once the outputformat is in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635199#comment-16635199
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221882439
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
 ##
 @@ -41,7 +41,7 @@
  * @param  Type of the elements to write.
  */
 public abstract class CassandraOutputFormatBase extends 
RichOutputFormat {
-   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormatBase.class);
+   private final Logger logger = LoggerFactory.getLogger(getClass());
 
 Review comment:
   Please revert changes to this class; this PR is for the InputFormat and 
shouldn't touch other classes unless absolutely necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221887770
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -57,17 +61,27 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
  */
 public class JarRunHandlerParameterTest extends TestLogger {
 
 Review comment:
   A similar test is required for the `JarPlanHandler`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888363
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' instead. " +
 
 Review comment:
   Let's also add a `@Deprecated` annotation to the class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888435
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' instead. " +
+   "String value that specifies the arguments for the 
program or plan.";
 
 Review comment:
   Remove comma after please.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888186
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the arguments list for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgsListQueryParameter extends StringQueryParameter {
+   public ProgramArgsListQueryParameter() {
+   super("program-args-list", 
MessageParameter.MessageParameterRequisiteness.OPTIONAL);
 
 Review comment:
   Rename to `programArgsList` to adhere to the new naming scheme, and use 
`FIELD_NAME_PROGRAM_ARGUMENTS_LIST`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r22197
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
 
 Review comment:
   Replace `programArgsList` with 
`JarRequestBody#FIELD_NAME_PROGRAM_ARGUMENTS_LIST`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635234#comment-16635234
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888435
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' instead. " +
+   "String value that specifies the arguments for the 
program or plan.";
 
 Review comment:
   Remove comma after please.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635233#comment-16635233
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221887770
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -57,17 +61,27 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
  */
 public class JarRunHandlerParameterTest extends TestLogger {
 
 Review comment:
   A similar test is required for the `JarPlanHandler`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635232#comment-16635232
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888186
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the arguments list for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgsListQueryParameter extends StringQueryParameter {
+   public ProgramArgsListQueryParameter() {
+   super("program-args-list", 
MessageParameter.MessageParameterRequisiteness.OPTIONAL);
 
 Review comment:
   Rename to `programArgsList` to adhere to the new naming scheme, and use 
`FIELD_NAME_PROGRAM_ARGUMENTS_LIST`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-10-02 Thread GitBox
fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 
support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r221889992
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -55,20 +55,20 @@
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link 
#close()} to
  * change the life cycle behavior.
- * 
 
 Review comment:
   Can you move the whitespace and checkstyle changes into a separate commit? 
This would make it much easier to review the PR. Thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635231#comment-16635231
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r22197
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
 
 Review comment:
   Replace `programArgsList` with 
`JarRequestBody#FIELD_NAME_PROGRAM_ARGUMENTS_LIST`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635230#comment-16635230
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221888363
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' instead. " +
 
 Review comment:
   Let's also add a `@Deprecated` annotation to the class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635235#comment-16635235
 ] 

ASF GitHub Bot commented on FLINK-10134:


fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 
support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r221889992
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -55,20 +55,20 @@
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link 
#close()} to
  * change the life cycle behavior.
- * 
 
 Review comment:
   Can you move the whitespace and checkstyle changes into a separate commit? 
This would make it much easier to review the PR. Thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6761: [FLINK-10279] 
[documentation] Make jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#discussion_r221894735
 
 

 ##
 File path: docs/dev/stream/python.md
 ##
 @@ -50,7 +53,12 @@ existing Java streaming APIs.
 There are two main constraints for using Jython:
 
 * The latest Python supported version is 2.7
-* It is not straightforward to use Python C extensions
+* It is not straightforward to use Python C extensions, which may prevent 
reuse of some libraries
+
+(For more information please see 
.)
+
+One possible alternative for streaming that allows for native Python execution 
would be the Apache Beam
 
 Review comment:
   add a link to the Apache Beam project, or maybe even a more specific 
python-related page.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6761: [FLINK-10279] 
[documentation] Make jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#discussion_r221894414
 
 

 ##
 File path: docs/dev/stream/python.md
 ##
 @@ -24,6 +24,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+*Note: This API is based on Jython, which is not a full Python replacement and 
may restrict
 
 Review comment:
   You can actually create neat looking `attention` boxes, like so `Attention`, as seen here: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#assigning-timestamps


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635245#comment-16635245
 ] 

ASF GitHub Bot commented on FLINK-10279:


zentol commented on a change in pull request #6761: [FLINK-10279] 
[documentation] Make jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#discussion_r221894414
 
 

 ##
 File path: docs/dev/stream/python.md
 ##
 @@ -24,6 +24,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+*Note: This API is based on Jython, which is not a full Python replacement and 
may restrict
 
 Review comment:
   You can actually create neat looking `attention` boxes, like so `Attention`, as seen here: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#assigning-timestamps


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635246#comment-16635246
 ] 

ASF GitHub Bot commented on FLINK-10279:


zentol commented on a change in pull request #6761: [FLINK-10279] 
[documentation] Make jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#discussion_r221894735
 
 

 ##
 File path: docs/dev/stream/python.md
 ##
 @@ -50,7 +53,12 @@ existing Java streaming APIs.
 There are two main constraints for using Jython:
 
 * The latest Python supported version is 2.7
-* It is not straightforward to use Python C extensions
+* It is not straightforward to use Python C extensions, which may prevent 
reuse of some libraries
+
+(For more information please see 
.)
+
+One possible alternative for streaming that allows for native Python execution 
would be the Apache Beam
 
 Review comment:
   add a link to the Apache Beam project, or maybe even a more specific 
python-related page.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10451) TableFunctionCollector should handle the life cycle of ScalarFunction

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635248#comment-16635248
 ] 

ASF GitHub Bot commented on FLINK-10451:


asfgit closed pull request #6771: [FLINK-10451] [table] TableFunctionCollector 
should handle the life cycle of ScalarFunction
URL: https://github.com/apache/flink/pull/6771
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
index 9fc76e32983..85d858fb75b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.codegen
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName}
 import org.apache.flink.table.codegen.Indenter.toISC
@@ -63,7 +64,8 @@ class CollectorCodeGenerator(
   def generateTableFunctionCollector(
   name: String,
   bodyCode: String,
-  collectedType: TypeInformation[Any])
+  collectedType: TypeInformation[Any],
+  codeGenerator: CodeGenerator)
 : GeneratedCollector = {
 
 val className = newName(name)
@@ -95,6 +97,11 @@ class CollectorCodeGenerator(
   |  }
   |
   |  @Override
+  |  public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
+  |${codeGenerator.reuseOpenCode()}
+  |  }
+  |
+  |  @Override
   |  public void collect(Object record) throws Exception {
   |super.collect(record);
   |$input1TypeClass $input1Term = ($input1TypeClass) getInput();
@@ -105,7 +112,8 @@ class CollectorCodeGenerator(
   |  }
   |
   |  @Override
-  |  public void close() {
+  |  public void close() throws Exception {
+  |${codeGenerator.reuseCloseCode()}
   |  }
   |}
   |""".stripMargin
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 43314577ab8..3475e1901e9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -136,6 +136,13 @@ trait CommonCorrelate {
   returnSchema.typeInfo,
   returnSchema.fieldNames)
 
+val filterGenerator = new FunctionCodeGenerator(
+  config,
+  false,
+  udtfTypeInfo,
+  None,
+  pojoFieldMapping)
+
 val collectorCode = if (condition.isEmpty) {
   s"""
  |${crossResultExpr.code}
@@ -153,13 +160,6 @@ trait CommonCorrelate {
   //   The generated expression is discarded.
   
generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
 
-  val filterGenerator = new FunctionCodeGenerator(
-config,
-false,
-udtfTypeInfo,
-None,
-pojoFieldMapping)
-
   filterGenerator.input1Term = filterGenerator.input2Term
   val filterCondition = filterGenerator.generateExpression(condition.get)
   s"""
@@ -175,7 +175,8 @@ trait CommonCorrelate {
 generator.generateTableFunctionCollector(
   "TableFunctionCollector",
   collectorCode,
-  udtfTypeInfo)
+  udtfTypeInfo,
+  filterGenerator)
   }
 
   private[flink] def selectToString(rowType: RelDataType): String = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
index 2553d9cd67b..747828cedbd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -59,7 +59,9 @@ class CRowCorrelateProcessRunner(
 val constructor = 
processClazz.getConstructor(classOf[TableFunctionCollector[_]])
 LOG.debug("Instantiating ProcessFunction.")
 function = 
constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
+FunctionUtils.set

[GitHub] asfgit closed pull request #6771: [FLINK-10451] [table] TableFunctionCollector should handle the life cycle of ScalarFunction

2018-10-02 Thread GitBox
asfgit closed pull request #6771: [FLINK-10451] [table] TableFunctionCollector 
should handle the life cycle of ScalarFunction
URL: https://github.com/apache/flink/pull/6771
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
index 9fc76e32983..85d858fb75b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.codegen
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName}
 import org.apache.flink.table.codegen.Indenter.toISC
@@ -63,7 +64,8 @@ class CollectorCodeGenerator(
   def generateTableFunctionCollector(
   name: String,
   bodyCode: String,
-  collectedType: TypeInformation[Any])
+  collectedType: TypeInformation[Any],
+  codeGenerator: CodeGenerator)
 : GeneratedCollector = {
 
 val className = newName(name)
@@ -95,6 +97,11 @@ class CollectorCodeGenerator(
   |  }
   |
   |  @Override
+  |  public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
+  |${codeGenerator.reuseOpenCode()}
+  |  }
+  |
+  |  @Override
   |  public void collect(Object record) throws Exception {
   |super.collect(record);
   |$input1TypeClass $input1Term = ($input1TypeClass) getInput();
@@ -105,7 +112,8 @@ class CollectorCodeGenerator(
   |  }
   |
   |  @Override
-  |  public void close() {
+  |  public void close() throws Exception {
+  |${codeGenerator.reuseCloseCode()}
   |  }
   |}
   |""".stripMargin
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 43314577ab8..3475e1901e9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -136,6 +136,13 @@ trait CommonCorrelate {
   returnSchema.typeInfo,
   returnSchema.fieldNames)
 
+val filterGenerator = new FunctionCodeGenerator(
+  config,
+  false,
+  udtfTypeInfo,
+  None,
+  pojoFieldMapping)
+
 val collectorCode = if (condition.isEmpty) {
   s"""
  |${crossResultExpr.code}
@@ -153,13 +160,6 @@ trait CommonCorrelate {
   //   The generated expression is discarded.
   
generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
 
-  val filterGenerator = new FunctionCodeGenerator(
-config,
-false,
-udtfTypeInfo,
-None,
-pojoFieldMapping)
-
   filterGenerator.input1Term = filterGenerator.input2Term
   val filterCondition = filterGenerator.generateExpression(condition.get)
   s"""
@@ -175,7 +175,8 @@ trait CommonCorrelate {
 generator.generateTableFunctionCollector(
   "TableFunctionCollector",
   collectorCode,
-  udtfTypeInfo)
+  udtfTypeInfo,
+  filterGenerator)
   }
 
   private[flink] def selectToString(rowType: RelDataType): String = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
index 2553d9cd67b..747828cedbd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -59,7 +59,9 @@ class CRowCorrelateProcessRunner(
 val constructor = 
processClazz.getConstructor(classOf[TableFunctionCollector[_]])
 LOG.debug("Instantiating ProcessFunction.")
 function = 
constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
+FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
 FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(collector, parameters)
 FunctionUtils.openFunction(function, parameters)
   }
 
@@ -85,6 +87,7 @@

[jira] [Resolved] (FLINK-10451) TableFunctionCollector should handle the life cycle of ScalarFunction

2018-10-02 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-10451.
--
   Resolution: Fixed
Fix Version/s: 1.5.5
   1.6.2
   1.7.0

Fixed in 1.7.0: 22613b7392f6f9b344291bb3a5cec84c9aa40926
Fixed in 1.6.2: 7fb980ec74b95d7e6e27d6414af54deb9010e134
Fixed in 1.5.5: 9a1f0c5e05ef50f115c8f92aac9fc156cdf54249

> TableFunctionCollector should handle the life cycle of ScalarFunction
> -
>
> Key: FLINK-10451
> URL: https://issues.apache.org/jira/browse/FLINK-10451
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Considering the following query:
> table.join(udtf('a)).where(udf('b))
> the filter will be pushed into DataSetCorrelate/DataStreamCorrelate without 
> triggering open() and close()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL opened a new pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
GJL opened a new pull request #6785: [FLINK-10309][rest] Before shutting down 
cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785
 
 
   ## What is the purpose of the change
   
   *Wait for the result of asynchronous operations to be served before shutting 
down the cluster.  This is necessary for the _"cancel with savepoint"_ 
operation. If we do not wait for the result to be accessed by the client, we 
may shutdown the cluster, and the client gets a `ConnectionException`.*
   
   cc: @zentol @tillrohrmann 
   
   ## Brief change log
   
 - *Before shutting down cluster, wait for asynchronous operations.*
 - *Log stacktrace if checkpoint cannot be ack'ed.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test to `RestServerEndpointITCase` to verify that handlers are 
closed first.*
 - *Added unit tests for `CompletedOperationCache`.*
 - *Verified the changes by submitting and cancelling with savepoint of a 
job in a loop.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10309:
---
Labels: pull-request-available  (was: )

> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         ... 1 more
> Caused by: java.util.concurrent.CompletionException: 
> java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>         at 
> java.util.concurrent.CompletableFuture$Un

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635282#comment-16635282
 ] 

ASF GitHub Bot commented on FLINK-10309:


GJL opened a new pull request #6785: [FLINK-10309][rest] Before shutting down 
cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785
 
 
   ## What is the purpose of the change
   
   *Wait for the result of asynchronous operations to be served before shutting 
down the cluster.  This is necessary for the _"cancel with savepoint"_ 
operation. If we do not wait for the result to be accessed by the client, we 
may shutdown the cluster, and the client gets a `ConnectionException`.*
   
   cc: @zentol @tillrohrmann 
   
   ## Brief change log
   
 - *Before shutting down cluster, wait for asynchronous operations.*
 - *Log stacktrace if checkpoint cannot be ack'ed.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test to `RestServerEndpointITCase` to verify that handlers are 
closed first.*
 - *Added unit tests for `CompletedOperationCache`.*
 - *Verified the changes by submitting and cancelling with savepoint of a 
job in a loop.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionall

[jira] [Created] (FLINK-10479) Add Firebase Firestore Connector

2018-10-02 Thread JIRA
Niklas Gögge created FLINK-10479:


 Summary: Add Firebase Firestore Connector
 Key: FLINK-10479
 URL: https://issues.apache.org/jira/browse/FLINK-10479
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Niklas Gögge


This feature would add a new connector(sink/source) for Cloud Firestore which 
is part of Firebase.

Firebase is google's backend as a service platform for building apps. You can 
read more about it  [here|https://firebase.google.com/].
Firestore is a NoSQL document database service which could act as data sink and 
source because it provides realtime listeners for data changes.

I would like to implement this myself if this proposal is accepted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10480) Add Firebase Firestore Connector

2018-10-02 Thread JIRA
Niklas Gögge created FLINK-10480:


 Summary: Add Firebase Firestore Connector
 Key: FLINK-10480
 URL: https://issues.apache.org/jira/browse/FLINK-10480
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Niklas Gögge


This feature would add a new connector(sink/source) for Cloud Firestore which 
is part of Firebase.

Firebase is google's backend as a service platform for building apps. You can 
read more about it [here|https://firebase.google.com/].
 Firestore is a NoSQL document database service which could act as sink and 
source because it provides realtime listeners for data changes.

I would like to implement this myself if this proposal is accepted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10480) Add Firebase Firestore Connector

2018-10-02 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niklas Gögge closed FLINK-10480.

Resolution: Duplicate

> Add Firebase Firestore Connector
> 
>
> Key: FLINK-10480
> URL: https://issues.apache.org/jira/browse/FLINK-10480
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Niklas Gögge
>Priority: Major
>  Labels: proposal
>
> This feature would add a new connector(sink/source) for Cloud Firestore which 
> is part of Firebase.
> Firebase is google's backend as a service platform for building apps. You can 
> read more about it [here|https://firebase.google.com/].
>  Firestore is a NoSQL document database service which could act as sink and 
> source because it provides realtime listeners for data changes.
> I would like to implement this myself if this proposal is accepted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12

2018-10-02 Thread GitBox
zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426240532
 
 
   I'm currently preparing an asm6 version for `flink-shaded` since we also 
need it for java 9 compatibility.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12

2018-10-02 Thread GitBox
zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426240677
 
 
   Note that the GitHub link for the version you're using results in a 404.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635328#comment-16635328
 ] 

ASF GitHub Bot commented on FLINK-7811:
---

zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426240532
 
 
   I'm currently preparing an asm6 version for `flink-shaded` since we also 
need it for java 9 compatibility.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635329#comment-16635329
 ] 

ASF GitHub Bot commented on FLINK-7811:
---

zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426240677
 
 
   Note that the GitHub link for the version you're using results in a 404.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12

2018-10-02 Thread GitBox
zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426241033
 
 
   For `flink-shaded-asm` it would be useful to know which components of asm 
are actually required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635331#comment-16635331
 ] 

ASF GitHub Bot commented on FLINK-7811:
---

zentol commented on issue #6784: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6784#issuecomment-426241033
 
 
   For `flink-shaded-asm` it would be useful to know which components of asm 
are actually required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol opened a new pull request #6786: [FLINK-10292][rest] Separate REST and Dispatcher RPC thread pools

2018-10-02 Thread GitBox
zentol opened a new pull request #6786: [FLINK-10292][rest] Separate REST and 
Dispatcher RPC thread pools
URL: https://github.com/apache/flink/pull/6786
 
 
   
   
   ## What is the purpose of the change
   
   This PR separates the REST and DispatcherRPC thread pools. Basically the 
`WebMonitorEndpoint` creates it's own `ExecutorService` based on the 
`rest.server.numThreads` option, instead of being passed the Dispatcher RPC's 
`Executor`.
   
   ## Brief change log
   
   * add `RestOptions#SERVER_NUM_THREADS`
   *  extend `RestServerEndpointConfiguration` to process new option (this 
change is questionable since the `RestServerEndpoint` never uses this option)
   * create `ExecutorService` in `WebMonitorEndpoint`.
   
   ## Verifying this change
   
   I've added a basic test to cover the behavior of the 
`RestServerEndpointConfiguration`.
   
   Testing the executor service itself is possible (submit N blocking tasks, 
count how many end up blocking, assert numBlocking==configuredNumThreads) but 
getting it stable is tricky.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10292:
---
Labels: pull-request-available  (was: )

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635339#comment-16635339
 ] 

ASF GitHub Bot commented on FLINK-10292:


zentol opened a new pull request #6786: [FLINK-10292][rest] Separate REST and 
Dispatcher RPC thread pools
URL: https://github.com/apache/flink/pull/6786
 
 
   
   
   ## What is the purpose of the change
   
   This PR separates the REST and DispatcherRPC thread pools. Basically the 
`WebMonitorEndpoint` creates it's own `ExecutorService` based on the 
`rest.server.numThreads` option, instead of being passed the Dispatcher RPC's 
`Executor`.
   
   ## Brief change log
   
   * add `RestOptions#SERVER_NUM_THREADS`
   *  extend `RestServerEndpointConfiguration` to process new option (this 
change is questionable since the `RestServerEndpoint` never uses this option)
   * create `ExecutorService` in `WebMonitorEndpoint`.
   
   ## Verifying this change
   
   I've added a basic test to cover the behavior of the 
`RestServerEndpointConfiguration`.
   
   Testing the executor service itself is possible (submit N blocking tasks, 
count how many end up blocking, assert numBlocking==configuredNumThreads) but 
getting it stable is tricky.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10475) Standalone HA - Leader election is not triggered on loss of leader

2018-10-02 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635360#comment-16635360
 ] 

Till Rohrmann commented on FLINK-10475:
---

Hi [~Jamalarm], this sounds as if ZooKeeper did not notice the one JM being 
killed. Thus, it could simply be a ZooKeeper setup problem. 

In order to further debug the problem, it would be helpful to get the logs of 
the JobManagers.

> Standalone HA - Leader election is not triggered on loss of leader
> --
>
> Key: FLINK-10475
> URL: https://issues.apache.org/jira/browse/FLINK-10475
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
>Reporter: Thomas Wozniakowski
>Priority: Blocker
>
> Hey Guys,
> Just testing the new bugfix release of 1.5.4. Happy to see that the issue of 
> jobgraphs hanging around forever has been resolved in standalone/zookeeper HA 
> mode, but now I'm seeing a different issue.
> It looks like the HA failover is never triggered. I set up a 3/3/3 cluster of 
> zookeeper/jobmanager/taskmanagers. Started my job, all fine with the new 
> version. I then proceeded to kill the leading jobmanager to test the failover.
> The remaining jobmanagers never triggered a leader election, and simply got 
> stuck.
> The logs of the remaining job managers were full of this:
> {quote}
> 2018-10-01 15:35:44,558 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Could not 
> retrieve the redirect address.
> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: 
> Ask timed out on 
> [Actor[akka.tcp://flink@10.1.3.118:50010/user/dispatcher#-1286445443]] after 
> [1 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>   at akka.dispatch.OnComplete.internal(Future.scala:258)
>   at akka.dispatch.OnComplete.internal(Future.scala:256)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:745)
> {quote}
> Please give me a shout if I can provide any more useful information



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10475) Standalone HA - Leader election is not triggered on loss of leader

2018-10-02 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635360#comment-16635360
 ] 

Till Rohrmann edited comment on FLINK-10475 at 10/2/18 12:11 PM:
-

Hi [~Jamalarm], this sounds as if ZooKeeper did not notice the one JM being 
killed. Thus, it could simply be a ZooKeeper setup problem. 

In order to further debug the problem, it would be helpful to get the logs of 
the JobManagers.

The error messages originate from the REST handlers and are not a critical 
problem.


was (Author: till.rohrmann):
Hi [~Jamalarm], this sounds as if ZooKeeper did not notice the one JM being 
killed. Thus, it could simply be a ZooKeeper setup problem. 

In order to further debug the problem, it would be helpful to get the logs of 
the JobManagers.

> Standalone HA - Leader election is not triggered on loss of leader
> --
>
> Key: FLINK-10475
> URL: https://issues.apache.org/jira/browse/FLINK-10475
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
>Reporter: Thomas Wozniakowski
>Priority: Blocker
>
> Hey Guys,
> Just testing the new bugfix release of 1.5.4. Happy to see that the issue of 
> jobgraphs hanging around forever has been resolved in standalone/zookeeper HA 
> mode, but now I'm seeing a different issue.
> It looks like the HA failover is never triggered. I set up a 3/3/3 cluster of 
> zookeeper/jobmanager/taskmanagers. Started my job, all fine with the new 
> version. I then proceeded to kill the leading jobmanager to test the failover.
> The remaining jobmanagers never triggered a leader election, and simply got 
> stuck.
> The logs of the remaining job managers were full of this:
> {quote}
> 2018-10-01 15:35:44,558 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Could not 
> retrieve the redirect address.
> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: 
> Ask timed out on 
> [Actor[akka.tcp://flink@10.1.3.118:50010/user/dispatcher#-1286445443]] after 
> [1 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
>   at akka.dispatch.OnComplete.internal(Future.scala:258)
>   at akka.dispatch.OnComplete.internal(Future.scala:256)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:745)
> {quote}
> Please give me a shout if I can provide any more useful information



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6455: [FLINK-9997] [table] Improve Expression Reduce

2018-10-02 Thread GitBox
twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221923802
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -480,11 +480,20 @@ abstract class BatchTableEnvironment(
   normalizedPlan
 }
 
-// 4. optimize the physical Flink plan
+// 4. optimize the logical Flink plan
+val flinkLogicalOptRuleSet = getFlinkLogicalOptRuleSet
 
 Review comment:
   I'm not sure about adding another step in the optimization phase. You 
mentioned that "If adding ReduceExpressionRule.CALC to logical opt phase, it 
will increase the the search time.", do you have numbers for this? Otherwise I 
would add it to logical optimization phase because this is actually where it 
belongs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6455: [FLINK-9997] [table] Improve Expression Reduce

2018-10-02 Thread GitBox
twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221917365
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
 ##
 @@ -477,6 +476,22 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test
+  def testReduceDeterministicUDFInBatch(): Unit = {
+val util = batchTestUtil()
+val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+// if isDeterministic = true, will cause a Calcite NPE, which will be 
fixed in [CALCITE-1860]
 
 Review comment:
   Is this comment still valid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6455: [FLINK-9997] [table] Improve Expression Reduce

2018-10-02 Thread GitBox
twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221924196
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ##
 @@ -136,6 +136,11 @@ object FlinkRuleSets {
 FlinkLogicalNativeTableScan.CONVERTER
   )
 
+  val FLINK_LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
 
 Review comment:
   You mentioned "There's no Calc node at normalize plans, so 
ReduceExpressionRule.CALC does not match any thing.". So can we remove it from 
`DATASTREAM_NORM_RULES` and `DATASET_NORM_RULES`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9997) Improve Expression Reduce

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635369#comment-16635369
 ] 

ASF GitHub Bot commented on FLINK-9997:
---

twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221924196
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ##
 @@ -136,6 +136,11 @@ object FlinkRuleSets {
 FlinkLogicalNativeTableScan.CONVERTER
   )
 
+  val FLINK_LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
 
 Review comment:
   You mentioned "There's no Calc node at normalize plans, so 
ReduceExpressionRule.CALC does not match any thing.". So can we remove it from 
`DATASTREAM_NORM_RULES` and `DATASET_NORM_RULES`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6455: [FLINK-9997] [table] Improve Expression Reduce

2018-10-02 Thread GitBox
twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221917625
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
 ##
 @@ -477,6 +476,22 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test
+  def testReduceDeterministicUDFInBatch(): Unit = {
 
 Review comment:
   Remove duplicate code and add if branch for parts that differ in batch and 
streaming.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9997) Improve Expression Reduce

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635370#comment-16635370
 ] 

ASF GitHub Bot commented on FLINK-9997:
---

twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221923802
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -480,11 +480,20 @@ abstract class BatchTableEnvironment(
   normalizedPlan
 }
 
-// 4. optimize the physical Flink plan
+// 4. optimize the logical Flink plan
+val flinkLogicalOptRuleSet = getFlinkLogicalOptRuleSet
 
 Review comment:
   I'm not sure about adding another step in the optimization phase. You 
mentioned that "If adding ReduceExpressionRule.CALC to logical opt phase, it 
will increase the the search time.", do you have numbers for this? Otherwise I 
would add it to logical optimization phase because this is actually where it 
belongs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9997) Improve Expression Reduce

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635368#comment-16635368
 ] 

ASF GitHub Bot commented on FLINK-9997:
---

twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221917625
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
 ##
 @@ -477,6 +476,22 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test
+  def testReduceDeterministicUDFInBatch(): Unit = {
 
 Review comment:
   Remove duplicate code and add if branch for parts that differ in batch and 
streaming.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9997) Improve Expression Reduce

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635367#comment-16635367
 ] 

ASF GitHub Bot commented on FLINK-9997:
---

twalthr commented on a change in pull request #6455: [FLINK-9997] [table] 
Improve Expression Reduce
URL: https://github.com/apache/flink/pull/6455#discussion_r221917365
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
 ##
 @@ -477,6 +476,22 @@ class ExpressionReductionRulesTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test
+  def testReduceDeterministicUDFInBatch(): Unit = {
+val util = batchTestUtil()
+val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+// if isDeterministic = true, will cause a Calcite NPE, which will be 
fixed in [CALCITE-1860]
 
 Review comment:
   Is this comment still valid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221919990
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
 
 Review comment:
   When making extensive changes to a class along wit moving it somewhere else 
please do so in separate commits. Now i have to create my own diff which 
shouldn't be necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221926520
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
 
 Review comment:
   I take it this is done so that the shutdown doesn't block indefinitely, 
since it snapshots the current set of operations? Please add a comment for this 
if that is the case.
   
   Do we log anywhere that this result has expired?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221919360
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ##
 @@ -120,4 +122,8 @@ protected AbstractRestHandler(
 * @throws RestHandlerException if the handling failed
 */
protected abstract CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull T gateway) throws RestHandlerException;
+
+   public CompletableFuture closeAsync() {
 
 Review comment:
   `@Override`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221920986
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
+   }
+   })
+   .ticker(ticker)
+   .build();
+   }
+
+   /**
+* Registers an ongoing operation with the cache.
+*
+* @param operationResultFuture A future containing the operation 
result.
+*/
+   public void registerOngoingOperation(
+   final K operationKey,
+   final CompletableFuture operationResultFuture) {
+   final ResultAccessTracker inProgress = 
ResultAccessTracker.inProgress();
+   registeredOperationTriggers.put(operationKey, inProgress);
+   operationResultFuture.whenComplete((result, error) -> {
+   if (error == null) {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Right(result)));
+   } else {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Left(error)));
+   }
+   registeredOperationTriggers.remove(operationKey);
+   });
+   }
+
+   /**
+* Returns the operation result or a {@code Thro

[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221921902
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
+   }
+   })
+   .ticker(ticker)
+   .build();
+   }
+
+   /**
+* Registers an ongoing operation with the cache.
+*
+* @param operationResultFuture A future containing the operation 
result.
+*/
+   public void registerOngoingOperation(
+   final K operationKey,
+   final CompletableFuture operationResultFuture) {
+   final ResultAccessTracker inProgress = 
ResultAccessTracker.inProgress();
+   registeredOperationTriggers.put(operationKey, inProgress);
+   operationResultFuture.whenComplete((result, error) -> {
+   if (error == null) {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Right(result)));
+   } else {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Left(error)));
+   }
+   registeredOperationTriggers.remove(operationKey);
+   });
+   }
+
+   /**
+* Returns the operation result or a {@code Thro

[GitHub] zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-02 Thread GitBox
zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221923131
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
 ##
 @@ -33,7 +33,7 @@
  * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
  * collection.
  *
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
 
 Review comment:
   This reference wasn't updated properly as it no longer points to the cache.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635378#comment-16635378
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221919360
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ##
 @@ -120,4 +122,8 @@ protected AbstractRestHandler(
 * @throws RestHandlerException if the handling failed
 */
protected abstract CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull T gateway) throws RestHandlerException;
+
+   public CompletableFuture closeAsync() {
 
 Review comment:
   `@Override`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635380#comment-16635380
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221921902
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
+   }
+   })
+   .ticker(ticker)
+   .build();
+   }
+
+   /**
+* Registers an ongoing operation with the cache.
+*
+* @param operationResultFuture A future containing the operation 
result.
+*/
+   public void registerOngoingOperation(
+   final K operationKey,
+   final CompletableFuture operationResultFuture) {
+   final ResultAccessTracker inProgress = 
ResultAccessTracker.inProgress();
+   registeredOperationTriggers.put(operationKey, inProgress);
+   operationResultFuture.whenComplete((result, error) -> {
+   if (error == null) {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Right(result)));
+   } else {
+   completedOperations.put(oper

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635379#comment-16635379
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221920986
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
+   }
+   })
+   .ticker(ticker)
+   .build();
+   }
+
+   /**
+* Registers an ongoing operation with the cache.
+*
+* @param operationResultFuture A future containing the operation 
result.
+*/
+   public void registerOngoingOperation(
+   final K operationKey,
+   final CompletableFuture operationResultFuture) {
+   final ResultAccessTracker inProgress = 
ResultAccessTracker.inProgress();
+   registeredOperationTriggers.put(operationKey, inProgress);
+   operationResultFuture.whenComplete((result, error) -> {
+   if (error == null) {
+   completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Right(result)));
+   } else {
+   completedOperations.put(oper

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635382#comment-16635382
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221923131
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
 ##
 @@ -33,7 +33,7 @@
  * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
  * collection.
  *
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
 
 Review comment:
   This reference wasn't updated properly as it no longer points to the cache.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>     

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635381#comment-16635381
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221919990
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
 
 Review comment:
   When making extensive changes to a class along wit moving it somewhere else 
please do so in separate commits. Now i have to create my own diff which 
shouldn't be necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635383#comment-16635383
 ] 

ASF GitHub Bot commented on FLINK-10309:


zentol commented on a change in pull request #6785: [FLINK-10309][rest] Before 
shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#discussion_r221926520
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache implements 
AutoCloseableAsync {
+
+   private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+   /**
+* In-progress asynchronous operations.
+*/
+   private final Map> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+   /**
+* Caches the result of completed operations.
+*/
+   private final Cache> completedOperations;
+
+   CompletedOperationCache() {
+   this(Ticker.systemTicker());
+   }
+
+   @VisibleForTesting
+   CompletedOperationCache(final Ticker ticker) {
+   completedOperations = CacheBuilder.newBuilder()
+   
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .removalListener((RemovalListener>) removalNotification -> {
+   if (removalNotification.wasEvicted()) {
+   
Preconditions.checkState(removalNotification.getValue() != null);
+   
removalNotification.getValue().markAccessed();
 
 Review comment:
   I take it this is done so that the shutdown doesn't block indefinitely, 
since it snapshots the current set of operations? Please add a comment for this 
if that is the case.
   
   Do we log anywhere that this result has expired?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>As

[jira] [Commented] (FLINK-8819) Rework travis script to use build stages

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635388#comment-16635388
 ] 

ASF GitHub Bot commented on FLINK-8819:
---

zentol closed pull request #6642: [FLINK-8819][travis] Rework travis script to 
use stages
URL: https://github.com/apache/flink/pull/6642
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index 47ccf421555..28b7e4c752e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -20,8 +20,11 @@ sudo: required
 dist: trusty
 
 cache:
+  # default timeout is too low
+  timeout: 600
   directories:
   - $HOME/.m2
+  - $HOME/flink_cache
 
 # do not cache our own artifacts
 before_cache:
@@ -31,62 +34,6 @@ install: true
 
 language: java
 
-# - define unique cache names in case the auto-generated ones are not unique
-#  (see https://docs.travis-ci.com/user/caching/#Caches-and-build-matrices)
-# - See https://issues.apache.org/jira/browse/FLINK-1072
-matrix:
-  include:
-- jdk: "oraclejdk8"
-  env:
-- TEST="core"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_CO
-- jdk: "oraclejdk8"
-  env:
-- TEST="libraries"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_L
-- jdk: "oraclejdk8"
-  env:
-- TEST="connectors"
-- PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis"
-- CACHE_NAME=JDK8_H280_CN
-- jdk: "oraclejdk8"
-  env:
-- TEST="tests"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_T
-- jdk: "oraclejdk8"
-  env:
-- TEST="misc"
-- PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws"
-- CACHE_NAME=JDK8_H280_M
-- jdk: "openjdk8"
-  env:
-- TEST="core"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_CO
-- jdk: "openjdk8"
-  env:
-- TEST="libraries"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_L
-- jdk: "openjdk8"
-  env:
-- TEST="connectors"
-- PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis"
-- CACHE_NAME=JDK8_H241_CN
-- jdk: "openjdk8"
-  env:
-- TEST="tests"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_T
-- jdk: "openjdk8"
-  env:
-- TEST="misc"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_M
-
 git:
   depth: 100
 
@@ -102,6 +49,9 @@ env:
 
 before_script:
- "gem install --no-document --version 0.8.9 faraday "
+   - "export -f travis_nanoseconds"
+   - "export -f travis_time_start"
+   - "export -f travis_time_finish"
 
 # Install maven 3.2.5 since trusty uses 3.3.9 for which shading is broken
 before_install:
@@ -119,6 +69,57 @@ before_install:
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
 
-# We run mvn and monitor its output. If there is no output for the specified 
number of seconds, we
-# print the stack traces of all running Java processes.
-script: "./tools/travis_mvn_watchdog.sh 300"
+
+jdk: "oraclejdk8"
+jobs:
+  include:
+# main profile
+- stage: compile
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: compile
+- stage: test
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: core
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: libraries
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: connectors
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: tests
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: misc
+- stage: cleanup
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: cleanup
+# legacy profile
+- stage: compile
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: compile(legacy)
+- stage: test
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: core(legacy)
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 

[GitHub] zentol closed pull request #6642: [FLINK-8819][travis] Rework travis script to use stages

2018-10-02 Thread GitBox
zentol closed pull request #6642: [FLINK-8819][travis] Rework travis script to 
use stages
URL: https://github.com/apache/flink/pull/6642
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index 47ccf421555..28b7e4c752e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -20,8 +20,11 @@ sudo: required
 dist: trusty
 
 cache:
+  # default timeout is too low
+  timeout: 600
   directories:
   - $HOME/.m2
+  - $HOME/flink_cache
 
 # do not cache our own artifacts
 before_cache:
@@ -31,62 +34,6 @@ install: true
 
 language: java
 
-# - define unique cache names in case the auto-generated ones are not unique
-#  (see https://docs.travis-ci.com/user/caching/#Caches-and-build-matrices)
-# - See https://issues.apache.org/jira/browse/FLINK-1072
-matrix:
-  include:
-- jdk: "oraclejdk8"
-  env:
-- TEST="core"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_CO
-- jdk: "oraclejdk8"
-  env:
-- TEST="libraries"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_L
-- jdk: "oraclejdk8"
-  env:
-- TEST="connectors"
-- PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis"
-- CACHE_NAME=JDK8_H280_CN
-- jdk: "oraclejdk8"
-  env:
-- TEST="tests"
-- PROFILE="-Dhadoop.version=2.8.3"
-- CACHE_NAME=JDK8_H280_T
-- jdk: "oraclejdk8"
-  env:
-- TEST="misc"
-- PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws"
-- CACHE_NAME=JDK8_H280_M
-- jdk: "openjdk8"
-  env:
-- TEST="core"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_CO
-- jdk: "openjdk8"
-  env:
-- TEST="libraries"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_L
-- jdk: "openjdk8"
-  env:
-- TEST="connectors"
-- PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis"
-- CACHE_NAME=JDK8_H241_CN
-- jdk: "openjdk8"
-  env:
-- TEST="tests"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_T
-- jdk: "openjdk8"
-  env:
-- TEST="misc"
-- PROFILE="-Dhadoop.version=2.4.1"
-- CACHE_NAME=JDK8_H241_M
-
 git:
   depth: 100
 
@@ -102,6 +49,9 @@ env:
 
 before_script:
- "gem install --no-document --version 0.8.9 faraday "
+   - "export -f travis_nanoseconds"
+   - "export -f travis_time_start"
+   - "export -f travis_time_finish"
 
 # Install maven 3.2.5 since trusty uses 3.3.9 for which shading is broken
 before_install:
@@ -119,6 +69,57 @@ before_install:
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
 
-# We run mvn and monitor its output. If there is no output for the specified 
number of seconds, we
-# print the stack traces of all running Java processes.
-script: "./tools/travis_mvn_watchdog.sh 300"
+
+jdk: "oraclejdk8"
+jobs:
+  include:
+# main profile
+- stage: compile
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: compile
+- stage: test
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: core
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: libraries
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: connectors
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: tests
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: misc
+- stage: cleanup
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis 
-Dinclude_hadoop_aws"
+  name: cleanup
+# legacy profile
+- stage: compile
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: compile(legacy)
+- stage: test
+  script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: core(legacy)
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: libraries(legacy)
+- script: ./tools/travis_controller.sh
+  env: PROFILE="-Dhadoop.version=2.4.1 -Pinclude-kinesis -DlegacyCode"
+  name: connectors(legacy)
+- script: ./tools/travis_controller.s

[jira] [Closed] (FLINK-8819) Rework travis script to use build stages

2018-10-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8819.
---
   Resolution: Fixed
Fix Version/s: 1.7.0

master: c9fe541ebe13f19cc2d2db9b5fa7fdcf6bd584e0

> Rework travis script to use build stages
> 
>
> Key: FLINK-8819
> URL: https://issues.apache.org/jira/browse/FLINK-8819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System, Travis
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This issue is for tracking efforts to rework our Travis scripts to use 
> [stages|https://docs.travis-ci.com/user/build-stages/].
> This feature allows us to define a sequence of jobs that are run one after 
> another. This implies that we can define dependencies between jobs, in 
> contrast to our existing jobs that have to be self-contained.
> As an example, we could have a compile stage, and a test stage with multiple 
> jobs.
> The main benefit here is that we no longer have to compile modules multiple 
> times, which would reduce our build times.
> The major issue here however is that there is no _proper_ support for passing 
> build-artifacts from one stage to the next. According to this 
> [issue|https://github.com/travis-ci/beta-features/issues/28] it is on their 
> to-do-list however.
> In the mean-time we could manually transfer the artifacts between stages by 
> either using the Travis cache or some other external storage. The cache 
> solution would work by setting up a cached directory (just like the mvn 
> cache) and creating build-scope directories within containing the artifacts 
> (I have a prototype that works like this).
> The major concern here is that of cleaning up the cache/storage.
>  We can clean things up if
>  * our script fails
>  * the last stage succeeds.
> We can *not* clean things up if
>  * the build is canceled
>  * travis fails the build due to a timeout or similar
> as apparently there is [no way to run a script at the end of a 
> build|https://github.com/travis-ci/travis-ci/issues/4221].
> Thus we would either have to periodically clear the cache, or encode more 
> information into the cached files that would allow _other_ builds to clean up 
> stale date. (For example the build number or date).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10208) Bump mockito to 2.0+

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635397#comment-16635397
 ] 

ASF GitHub Bot commented on FLINK-10208:


tillrohrmann commented on a change in pull request #6634: [FLINK-10208][build] 
Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5
URL: https://github.com/apache/flink/pull/6634#discussion_r221892988
 
 

 ##
 File path: pom.xml
 ##
 @@ -123,8 +123,8 @@ under the License.
0.3.0
1.8.2
4.12
-   1.10.19
-   1.6.5
+   2.21.0
+   2.0.0-beta.5
 
 Review comment:
   Why do we need to use the beta version here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bump mockito to 2.0+
> 
>
> Key: FLINK-10208
> URL: https://issues.apache.org/jira/browse/FLINK-10208
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Mockito only properly supports java 9 with version 2. We have to bump the 
> dependency and fix various API incompatibilities.
> Additionally we could investigate whether we still need powermock after 
> bumping the dependency (which we'd also have to bump otherwise).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5

2018-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #6634: [FLINK-10208][build] 
Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5
URL: https://github.com/apache/flink/pull/6634#discussion_r221892988
 
 

 ##
 File path: pom.xml
 ##
 @@ -123,8 +123,8 @@ under the License.
0.3.0
1.8.2
4.12
-   1.10.19
-   1.6.5
+   2.21.0
+   2.0.0-beta.5
 
 Review comment:
   Why do we need to use the beta version here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-10-02 Thread GitBox
fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 
support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r221932785
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
 ##
 @@ -85,14 +90,26 @@ public void configure(Configuration parameters) {
 
@Override
public String readRecord(String reusable, byte[] bytes, int offset, int 
numBytes) throws IOException {
+   String utf8 = "UTF-8";
+   String utf16 = "UTF-16";
+   String utf32 = "UTF-32";
+   int stepSize = 0;
+   String charsetName = this.getCharsetName();
+   if (charsetName.contains(utf8)) {
+   stepSize = 1;
+   } else if (charsetName.contains(utf16)) {
+   stepSize = 2;
+   } else if (charsetName.contains(utf32)) {
+   stepSize = 4;
+   }
//Check if \n is used as delimiter and the end of this line is 
a \r, then remove \r from the line
if (this.getDelimiter() != null && this.getDelimiter().length 
== 1
-   && this.getDelimiter()[0] == NEW_LINE && offset 
+ numBytes >= 1
-   && bytes[offset + numBytes - 1] == 
CARRIAGE_RETURN){
-   numBytes -= 1;
+   && this.getDelimiter()[0] == NEW_LINE && offset + 
numBytes >= stepSize
 
 Review comment:
   We only check the first byte of a character. Are these checks actually 
compatible with with all encodings (LE and BE)? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-10-02 Thread GitBox
fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 
support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r221933563
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
 ##
 @@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, 
String delimiter) {
assertEquals(content, result);
}
 
+   } catch (Throwable t) {
+   System.err.println("test failed with exception: " + 
t.getMessage());
+   t.printStackTrace(System.err);
+   fail("Test erroneous");
+   }
+   }
+
+   @Test
+   public void testUTF16Read() {
+   final String first = "First line";
+   final String second = "Second line";
+
+   try {
+   // create input file
+   File tempFile = 
File.createTempFile("TextInputFormatTest", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   PrintStream ps = new PrintStream(tempFile, "UTF-16");
+   ps.println(first);
+   ps.println(second);
+   ps.close();
+
+   TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+   inputFormat.setCharsetName("UTF-32");
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+// inputFormat.setDelimiter("\r");
 
 Review comment:
   please add tests to check if the `'\'` case is correctly handled for the 
different encodings.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >