Re: Yarn Test Timeout

2016-09-06 Thread Maximilian Michels
The testing code for Yarn is very fragile. Also, I'm puzzled why the
code to test the VCores setting is in the TaskManagerFailure test.
Running some more tests to fix the issue.

On Tue, Sep 6, 2016 at 7:28 AM, Vijay Srinivasaraghavan
 wrote:
> Hi Max,
> I have pulled and tested with the latest master, but I am still seeing some 
> issues.
> ---Test
>  set: 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase---Tests
>  run: 7, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 96.989 sec <<< 
> FAILURE! - in 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCasetestTaskManagerFailure(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>   Time elapsed: 14.128 sec  <<< FAILURE!java.lang.AssertionError: null
> at org.junit.Assert.fail(Assert.java:86)at 
> org.junit.Assert.assertTrue(Assert.java:41)at 
> org.junit.Assert.assertTrue(Assert.java:52)at 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.testTaskManagerFailure(YARNSessionCapacitySchedulerITCase.java:262)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>   Time elapsed: 0.569 sec  <<< FAILURE!java.lang.AssertionError: There is at 
> least one application on the cluster is not finished.App 
> application_1473139321658_0006 is in state RUNNINGat 
> org.junit.Assert.fail(Assert.java:88)at 
> org.apache.flink.yarn.YarnTestBase.checkClusterEmpty(YarnTestBase.java:171)   
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) 
>at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)   
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
>at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)   
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 
>at org.junit.rules.RunRules.evaluate(RunRules.java:20)at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:309)at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> RegardsVijay
>
> On Monday, September 5, 2016 7:28 AM, Maximilian Michels 
>  wrote:
>
>
>  Hi Vijay,
>
> The test fails when a NodeReport with used resources set to null is
> retrieved. The test assumes that a TaskManager is always exclusively
> running in one Yarn NodeManager which doesn't have to be true as one
> NodeManager can host multiple containers. The test only seems to
> reliably fail when the it's executed exclusively where a dedicated
> Yarn mini cluster is started only for the test.
>
> The problem is fixed on the latest master.
>
> Thanks,
> Max
>
> On Sun, Sep 4, 2016 at 11:21 PM, Vijay Srinivasaraghavan
>  wrote:
>> Hi Robert,
>> Yes, I am consistently seeing the issue. Here is the stack track for the run.
>>> mvn test integration-test 
>>

Re: Yarn Test Timeout

2016-09-06 Thread Maximilian Michels
Should be resolved now.

On Tue, Sep 6, 2016 at 11:21 AM, Maximilian Michels  wrote:
> The testing code for Yarn is very fragile. Also, I'm puzzled why the
> code to test the VCores setting is in the TaskManagerFailure test.
> Running some more tests to fix the issue.
>
> On Tue, Sep 6, 2016 at 7:28 AM, Vijay Srinivasaraghavan
>  wrote:
>> Hi Max,
>> I have pulled and tested with the latest master, but I am still seeing some 
>> issues.
>> ---Test
>>  set: 
>> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase---Tests
>>  run: 7, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 96.989 sec <<< 
>> FAILURE! - in 
>> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCasetestTaskManagerFailure(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>>   Time elapsed: 14.128 sec  <<< FAILURE!java.lang.AssertionError: null   
>>  at org.junit.Assert.fail(Assert.java:86)at 
>> org.junit.Assert.assertTrue(Assert.java:41)at 
>> org.junit.Assert.assertTrue(Assert.java:52)at 
>> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.testTaskManagerFailure(YARNSessionCapacitySchedulerITCase.java:262)
>> testNonexistingQueue(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>>   Time elapsed: 0.569 sec  <<< FAILURE!java.lang.AssertionError: There is at 
>> least one application on the cluster is not finished.App 
>> application_1473139321658_0006 is in state RUNNINGat 
>> org.junit.Assert.fail(Assert.java:88)at 
>> org.apache.flink.yarn.YarnTestBase.checkClusterEmpty(YarnTestBase.java:171)  
>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)at 
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at 
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at 
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at 
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>> at 
>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  
>>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)at 
>> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)   
>>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)at 
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)at 
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at 
>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  
>>   at 
>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)at 
>> org.junit.runners.ParentRunner.run(ParentRunner.java:309)at 
>> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>> at 
>> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>> at 
>> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>> at 
>> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>> at 
>> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>> at 
>> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>> RegardsVijay
>>
>> On Monday, September 5, 2016 7:28 AM, Maximilian Michels 
>>  wrote:
>>
>>
>>  Hi Vijay,
>>
>> The test fails when a NodeReport with used resources set to null is
>> retrieved. The test assumes that a TaskManager is always exclusively
>> running in one Yarn NodeManager which doesn't have to be true as one
>> NodeManager can host multiple containers. The test only seems to
>> reliably fail when the it's executed exclusively where a dedicated
>> Yarn mini cluster is started only for the test.
>>
>> The problem is fixed on the latest master.
>>
>> Thanks,
>> Max
>>
>> On Sun, 

[jira] [Created] (FLINK-4581) Table API throws "No suitable driver found for jdbc:calcite"

2016-09-06 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4581:
---

 Summary: Table API throws "No suitable driver found for 
jdbc:calcite"
 Key: FLINK-4581
 URL: https://issues.apache.org/jira/browse/FLINK-4581
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Timo Walther


It seems that in certain cases the internal Calcite JDBC driver cannot be 
found. We should either try to get rid of the entire JDBC invocation or fix 
this bug.

>From ML: 
>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html

{code}
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable
driver found for jdbc:calcite:
at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
at
org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
at
org.apache.flink.api.table.TableEnvironment.(TableEnvironment.scala:73)
at
org.apache.flink.api.table.StreamTableEnvironment.(StreamTableEnvironment.scala:58)
at
org.apache.flink.api.java.table.StreamTableEnvironment.(StreamTableEnvironment.scala:45)
at
org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
at
org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
at 
org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
... 6 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite:
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
... 20 more
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2016-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4582:
--

 Summary: Allow FlinkKinesisConsumer to adapt for AWS DynamoDB 
Streams
 Key: FLINK-4582
 URL: https://issues.apache.org/jira/browse/FLINK-4582
 Project: Flink
  Issue Type: New Feature
  Components: Kinesis Connector, Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.2.0


AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
capture) feature called DynamoDB Streams 
(http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), 
which is a stream feed of item-level table activities.

The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
only a slight difference in resharding behaviours, so it is possible to build 
on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
Streams source.

I propose an API something like this:
{code}
DataStream dynamoItemsCdc = 
  FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
{code}

The feature adds more connectivity to popular AWS services for Flink, and 
combining what Flink has for exactly-once semantics and out-of-core state 
backends with CDC can have very strong use cases. For this feature there should 
only be an extra dependency to the AWS Java SDK for DynamoDB, which has Apache 
License 2.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] FLIP-11: Table API Stream Aggregations

2016-09-06 Thread Timo Walther

Hi all,

I thought about the API of the FLIP again. If we allow the "systemtime" 
attribute, we cannot implement a nice method chaining where the user can 
define a "allowLateness" only on event time. So even if the user 
expressed that "systemtime" is used we have to offer a "allowLateness" 
method because we have to assume that this attribute can also be the 
batch event time column, which is not very nice.


class TumblingWindow(size: Expression) extends Window {
  def on(timeField: Expression): TumblingEventTimeWindow =
new TumblingEventTimeWindow(alias, timeField, size) // has 
allowLateness() method

}

What do you think?

Timo


Am 05/09/16 um 10:41 schrieb Fabian Hueske:

Hi Jark,

you had asked for non-windowed aggregates in the Table API a few times.
FLIP-11 proposes row-window aggregates which are a generalization of
running aggregates (SlideRow unboundedPreceding).

Can you have a look at the FLIP and give feedback whether this is what you
are looking for?
Improvement suggestions are very welcome as well.

Thank you,
Fabian

2016-09-01 16:12 GMT+02:00 Timo Walther :


Hi all!

Fabian and I worked on a FLIP for Stream Aggregations in the Table API.
You can find the FLIP-11 here:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%
3A+Table+API+Stream+Aggregations

Motivation for the FLIP:

The Table API is a declarative API to define queries on static and
streaming tables. So far, only projection, selection, and union are
supported operations on streaming tables.

This FLIP proposes to add support for different types of aggregations on
top of streaming tables. In particular, we seek to support:

- Group-window aggregates, i.e., aggregates which are computed for a group
of elements. A (time or row-count) window is required to bound the infinite
input stream into a finite group.

- Row-window aggregates, i.e., aggregates which are computed for each row,
based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or
non-keyed/grouped data streams for streaming tables as well as batch tables.

We are looking forward to your feedback.

Timo




--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr



[jira] [Created] (FLINK-4583) NullPointerException in CliFrontend

2016-09-06 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4583:
-

 Summary: NullPointerException in CliFrontend
 Key: FLINK-4583
 URL: https://issues.apache.org/jira/browse/FLINK-4583
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


If no Flink program is executed the following exception message is printed. 
This can happen when a driver prints usage due to insufficient or improper 
configuration.

{noformat}
 The program finished with the following exception:

java.lang.NullPointerException
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:781)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1002)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1045)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4584) Fix broken links on flink.apache.org

2016-09-06 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4584:
--

 Summary: Fix broken links on flink.apache.org
 Key: FLINK-4584
 URL: https://issues.apache.org/jira/browse/FLINK-4584
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Alexander Pivovarov
Priority: Minor


The following links are broken
link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html
correct link: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html

link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
correct link: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html


The following links show "Page 'X' Has Moved to" for 1-2 sec  and then redirect 
to correct page
link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
redirect-to: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html

link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html
redirect-to: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4585) Fix broken links on flink.apache.org

2016-09-06 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4585:
--

 Summary: Fix broken links on flink.apache.org
 Key: FLINK-4585
 URL: https://issues.apache.org/jira/browse/FLINK-4585
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Alexander Pivovarov
Priority: Minor


The following links are broken
link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html
correct link: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html

link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
correct link: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html


The following links show "Page 'X' Has Moved to" for 1-2 sec  and then redirect 
to correct page
link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
redirect-to: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html

link: 
http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html
redirect-to: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-06 Thread Johannes (JIRA)
Johannes created FLINK-4586:
---

 Summary: NumberSequenceIterator and Accumulator threading issue
 Key: FLINK-4586
 URL: https://issues.apache.org/jira/browse/FLINK-4586
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Affects Versions: 1.1.2
Reporter: Johannes
Priority: Minor


There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.
Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.
If the parallelism is set to {{1}} the result is correct, which seems like 
there is a problem with threading. The problem occurs using the java and scala 
API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _

override def map(value: Long): Long = {
  a.add(value)
  value
}

override def open(parameters: Configuration): Unit = {
  a = new AverageAccumulator
  getRuntimeContext.addAccumulator("test", a)
}
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] FLIP-11: Table API Stream Aggregations

2016-09-06 Thread Jark Wu
Hi all,

I'm on vacation for about five days , sorry to have missed this great FLIP.

Yes, the non-windowed aggregates is a special case of row-window. And the 
proposal looks really good.  Can we have a simplified form for the special 
case? Such as : 
table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…)  can be 
simplified to  table.groupBy(‘a).select(…). The latter will actually call the 
former. 

Another question is about the rowtime. As the FLIP said, DataStream and 
StreamTableSource is responsible to assign timestamps and watermarks, 
furthermore “rowtime” and “systemtime” are not real column. IMO, it is 
different with Calcite’s rowtime, which is a real column in the table. In 
FLIP's way, we will lose some flexibility. Because the timestamp column may be 
created after some transformations or join operation, not created at beginning. 
So why do we have to define rowtime at beginning? (because of watermark?) 
Can we have a way to define rowtime after source table like TimestampAssinger?

Regarding to “allowLateness” method. I come up a trick that we can make 
‘rowtime and ‘system to be a Scala object, not a symbol expression. The API 
will looks like this : 

window(Tumble over 10.minutes on rowtime allowLateness as ‘w) 

The implementation will look like this:

class TumblingWindow(size: Expression) extends Window {
  def on(time: rowtime.type): TumblingEventTimeWindow = 
  new TumblingEventTimeWindow(alias, ‘rowtime, size)// has 
allowLateness() method

  def on(time: systemtime.type): TumblingProcessingTimeWindow= 
 new TumblingProcessingTimeWindow(alias, ‘systemtime, size) // 
hasn’t allowLateness() method
}
object rowtime
object systemtime

What do you think about this?

- Jark Wu 

> 在 2016年9月6日,下午11:00,Timo Walther  写道:
> 
> Hi all,
> 
> I thought about the API of the FLIP again. If we allow the "systemtime" 
> attribute, we cannot implement a nice method chaining where the user can 
> define a "allowLateness" only on event time. So even if the user expressed 
> that "systemtime" is used we have to offer a "allowLateness" method because 
> we have to assume that this attribute can also be the batch event time 
> column, which is not very nice.
> 
> class TumblingWindow(size: Expression) extends Window {
>  def on(timeField: Expression): TumblingEventTimeWindow =
>new TumblingEventTimeWindow(alias, timeField, size) // has allowLateness() 
> method
> }
> 
> What do you think?
> 
> Timo
> 
> 
> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>> Hi Jark,
>> 
>> you had asked for non-windowed aggregates in the Table API a few times.
>> FLIP-11 proposes row-window aggregates which are a generalization of
>> running aggregates (SlideRow unboundedPreceding).
>> 
>> Can you have a look at the FLIP and give feedback whether this is what you
>> are looking for?
>> Improvement suggestions are very welcome as well.
>> 
>> Thank you,
>> Fabian
>> 
>> 2016-09-01 16:12 GMT+02:00 Timo Walther :
>> 
>>> Hi all!
>>> 
>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API.
>>> You can find the FLIP-11 here:
>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%
>>> 3A+Table+API+Stream+Aggregations
>>> 
>>> Motivation for the FLIP:
>>> 
>>> The Table API is a declarative API to define queries on static and
>>> streaming tables. So far, only projection, selection, and union are
>>> supported operations on streaming tables.
>>> 
>>> This FLIP proposes to add support for different types of aggregations on
>>> top of streaming tables. In particular, we seek to support:
>>> 
>>> - Group-window aggregates, i.e., aggregates which are computed for a group
>>> of elements. A (time or row-count) window is required to bound the infinite
>>> input stream into a finite group.
>>> 
>>> - Row-window aggregates, i.e., aggregates which are computed for each row,
>>> based on a window (range) of preceding and succeeding rows.
>>> Each type of aggregate shall be supported on keyed/grouped or
>>> non-keyed/grouped data streams for streaming tables as well as batch tables.
>>> 
>>> We are looking forward to your feedback.
>>> 
>>> Timo
>>> 
> 
> 
> -- 
> Freundliche Grüße / Kind Regards
> 
> Timo Walther
> 
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr



[jira] [Created] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE

2016-09-06 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4587:


 Summary: Yet another java.lang.NoSuchFieldError: INSTANCE
 Key: FLINK-4587
 URL: https://issues.apache.org/jira/browse/FLINK-4587
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.2.0
 Environment: Latest SNAPSHOT
Reporter: Renkai Ge


For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 
in flink.
The source file is:
{code}
import org.apache.flink.streaming.api.scala._
import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory

/**
  * Created by renkai on 16/9/7.
  */
object Main {
  def main(args: Array[String]): Unit = {
val instance = ManagedNHttpClientConnectionFactory.INSTANCE
println("instance = " + instance)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(1 to 100)
val result = stream.map { x =>
  x * 2
}
result.print()
env.execute("xixi")
  }
}

{code}

and 
{code}
name := "flink-explore"

version := "1.0"

scalaVersion := "2.11.8"

crossPaths := false

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
)
{code}
I use `sbt assembly` to get a fat jar.

If I run the command
{code}
 java -cp flink-explore-assembly-1.0.jar Main
{code}
I get the result 

{code}
instance = 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da
log4j:WARN No appenders could be found for logger 
(org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
09/07/2016 12:05:26 Job execution switched to status RUNNING.
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to SCHEDULED
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to DEPLOYING
...
09/07/2016 12:05:26 Map -> Sink: Unnamed(20/24) switched to RUNNING
09/07/2016 12:05:26 Map -> Sink: Unnamed(19/24) switched to RUNNING
15> 30
20> 184
...
19> 182
1> 194
8> 160
09/07/2016 12:05:26 Source: Collection Source(1/1) switched to FINISHED
...
09/07/2016 12:05:26 Map -> Sink: Unnamed(1/24) switched to FINISHED
09/07/2016 12:05:26 Job execution switched to status FINISHED.
{code}

Nothing special.

But if I run the jar by
{code}
./bin/flink run shop-monitor-flink-assembly-1.0.jar
{code}

I will get an error

{code}
$ ./bin/flink run flink-explore-assembly-1.0.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program


 The program finished with the following exception:

java.lang.NoSuchFieldError: INSTANCE
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53)
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57)
at 
org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:47)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:75)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:83)
at 
org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:64)
at Main$.main(Main.scala:9)
at Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774)
at org.

[DISCUSS] how choose Scala and Java

2016-09-06 Thread 时某人
Scala and Java mixed in the module. Some Flink API indeed make someone confused.
What is rule about  the current Scala and Java API at the first implement time?


Thanks

Re: [DISCUSS] FLIP-11: Table API Stream Aggregations

2016-09-06 Thread Fabian Hueske
Hi,

thanks for your comments and questions!
Actually, you are bringing up the points that Timo and I discussed the most
when designing the FLIP ;-)

- We also thought about the syntactic shortcut for running aggregates like
you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow
this shortcut is to prevent users from accidentally performing a
"dangerous" operation. The problem with unbounded sliding row-windows is
that their state does never expire. If you have an evolving key space, you
will likely run into problems at some point because the operator state
grows too large. IMO, a row-window session is a better approach, because it
defines a timeout after which state can be discarded. groupBy.select is a
very common operation in batch but its semantics in streaming are very
different. In my opinion it makes sense to make users aware of these
differences through the API.

- Reassigning timestamps and watermarks is a very delicate issue. You are
right, that Calcite exposes this field which is necessary due to the
semantics of SQL. However, also in Calcite you cannot freely choose the
timestamp attribute for streaming queries (it must be a monotone or
quasi-monotone attribute) which is hard to reason about (and guarantee)
after a few operators have been applied. Streaming tables in Flink will
likely have a time attribute which is identical to the initial rowtime.
However, Flink does modify timestamps internally, e.g., for records that
are emitted from time windows, in order to ensure that consecutive windows
perform as expected. Modify or reassign timestamps in the middle of a job
can result in unexpected results which are very hard to reason about. Do
you have a concrete use case in mind for reassigning timestamps?

- The idea to represent rowtime and systime as object is good. Our
motivation to go for reserved Scala symbols was to have a uniform syntax
with windows over streaming and batch tables. On batch tables you can
compute time windows basically over every time attribute (they are treated
similar to grouping attributes with a bit of extra logic to extract the
grouping key for sliding and session windows). If you write window(Tumble
over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate
event-time. On a batch table with a 'rowtime attribute, the same operator
would be internally converted into a group by. By going for the object
approach we would lose this compatibility (or would need to introduce an
additional column attribute to specifiy the window attribute for batch
tables).

As usual some of the design decisions are based on preferences.
Do they make sense to you? Let me know what you think.

Best, Fabian


2016-09-07 5:12 GMT+02:00 Jark Wu :

> Hi all,
>
> I'm on vacation for about five days , sorry to have missed this great FLIP.
>
> Yes, the non-windowed aggregates is a special case of row-window. And the
> proposal looks really good.  Can we have a simplified form for the special
> case? Such as : 
> table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…)
> can be simplified to  table.groupBy(‘a).select(…). The latter will actually
> call the former.
>
> Another question is about the rowtime. As the FLIP said, DataStream and
> StreamTableSource is responsible to assign timestamps and watermarks,
> furthermore “rowtime” and “systemtime” are not real column. IMO, it is
> different with Calcite’s rowtime, which is a real column in the table. In
> FLIP's way, we will lose some flexibility. Because the timestamp column may
> be created after some transformations or join operation, not created at
> beginning. So why do we have to define rowtime at beginning? (because of
> watermark?) Can we have a way to define rowtime after source table like
> TimestampAssinger?
>
> Regarding to “allowLateness” method. I come up a trick that we can make
> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API
> will looks like this :
>
> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>
> The implementation will look like this:
>
> class TumblingWindow(size: Expression) extends Window {
>   def on(time: rowtime.type): TumblingEventTimeWindow =
>   new TumblingEventTimeWindow(alias, ‘rowtime, size)// has
> allowLateness() method
>
>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>  new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>  // hasn’t allowLateness() method
> }
> object rowtime
> object systemtime
>
> What do you think about this?
>
> - Jark Wu
>
> > 在 2016年9月6日,下午11:00,Timo Walther  写道:
> >
> > Hi all,
> >
> > I thought about the API of the FLIP again. If we allow the "systemtime"
> attribute, we cannot implement a nice method chaining where the user can
> define a "allowLateness" only on event time. So even if the user expressed
> that "systemtime" is used we have to offer a "allowLateness" method because
> we have to assume that this attribute can also be the batch event time
>