Re: Yarn Test Timeout
The testing code for Yarn is very fragile. Also, I'm puzzled why the code to test the VCores setting is in the TaskManagerFailure test. Running some more tests to fix the issue. On Tue, Sep 6, 2016 at 7:28 AM, Vijay Srinivasaraghavan wrote: > Hi Max, > I have pulled and tested with the latest master, but I am still seeing some > issues. > ---Test > set: > org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase---Tests > run: 7, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 96.989 sec <<< > FAILURE! - in > org.apache.flink.yarn.YARNSessionCapacitySchedulerITCasetestTaskManagerFailure(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) > Time elapsed: 14.128 sec <<< FAILURE!java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86)at > org.junit.Assert.assertTrue(Assert.java:41)at > org.junit.Assert.assertTrue(Assert.java:52)at > org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.testTaskManagerFailure(YARNSessionCapacitySchedulerITCase.java:262) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) > Time elapsed: 0.569 sec <<< FAILURE!java.lang.AssertionError: There is at > least one application on the cluster is not finished.App > application_1473139321658_0006 is in state RUNNINGat > org.junit.Assert.fail(Assert.java:88)at > org.apache.flink.yarn.YarnTestBase.checkClusterEmpty(YarnTestBase.java:171) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498)at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) >at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)at > org.junit.rules.RunRules.evaluate(RunRules.java:20)at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >at org.junit.rules.RunRules.evaluate(RunRules.java:20)at > org.junit.runners.ParentRunner.run(ParentRunner.java:309)at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > RegardsVijay > > On Monday, September 5, 2016 7:28 AM, Maximilian Michels > wrote: > > > Hi Vijay, > > The test fails when a NodeReport with used resources set to null is > retrieved. The test assumes that a TaskManager is always exclusively > running in one Yarn NodeManager which doesn't have to be true as one > NodeManager can host multiple containers. The test only seems to > reliably fail when the it's executed exclusively where a dedicated > Yarn mini cluster is started only for the test. > > The problem is fixed on the latest master. > > Thanks, > Max > > On Sun, Sep 4, 2016 at 11:21 PM, Vijay Srinivasaraghavan > wrote: >> Hi Robert, >> Yes, I am consistently seeing the issue. Here is the stack track for the run. >>> mvn test integration-test >>
Re: Yarn Test Timeout
Should be resolved now. On Tue, Sep 6, 2016 at 11:21 AM, Maximilian Michels wrote: > The testing code for Yarn is very fragile. Also, I'm puzzled why the > code to test the VCores setting is in the TaskManagerFailure test. > Running some more tests to fix the issue. > > On Tue, Sep 6, 2016 at 7:28 AM, Vijay Srinivasaraghavan > wrote: >> Hi Max, >> I have pulled and tested with the latest master, but I am still seeing some >> issues. >> ---Test >> set: >> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase---Tests >> run: 7, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 96.989 sec <<< >> FAILURE! - in >> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCasetestTaskManagerFailure(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) >> Time elapsed: 14.128 sec <<< FAILURE!java.lang.AssertionError: null >> at org.junit.Assert.fail(Assert.java:86)at >> org.junit.Assert.assertTrue(Assert.java:41)at >> org.junit.Assert.assertTrue(Assert.java:52)at >> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.testTaskManagerFailure(YARNSessionCapacitySchedulerITCase.java:262) >> testNonexistingQueue(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) >> Time elapsed: 0.569 sec <<< FAILURE!java.lang.AssertionError: There is at >> least one application on the cluster is not finished.App >> application_1473139321658_0006 is in state RUNNINGat >> org.junit.Assert.fail(Assert.java:88)at >> org.apache.flink.yarn.YarnTestBase.checkClusterEmpty(YarnTestBase.java:171) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498)at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) >> at >> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)at >> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)at >> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> at >> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >> at >> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)at >> org.junit.runners.ParentRunner.run(ParentRunner.java:309)at >> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) >> at >> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) >> at >> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) >> RegardsVijay >> >> On Monday, September 5, 2016 7:28 AM, Maximilian Michels >> wrote: >> >> >> Hi Vijay, >> >> The test fails when a NodeReport with used resources set to null is >> retrieved. The test assumes that a TaskManager is always exclusively >> running in one Yarn NodeManager which doesn't have to be true as one >> NodeManager can host multiple containers. The test only seems to >> reliably fail when the it's executed exclusively where a dedicated >> Yarn mini cluster is started only for the test. >> >> The problem is fixed on the latest master. >> >> Thanks, >> Max >> >> On Sun,
[jira] [Created] (FLINK-4581) Table API throws "No suitable driver found for jdbc:calcite"
Timo Walther created FLINK-4581: --- Summary: Table API throws "No suitable driver found for jdbc:calcite" Key: FLINK-4581 URL: https://issues.apache.org/jira/browse/FLINK-4581 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther It seems that in certain cases the internal Calcite JDBC driver cannot be found. We should either try to get rid of the entire JDBC invocation or fix this bug. >From ML: >http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html {code} org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable driver found for jdbc:calcite: at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151) at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106) at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127) at org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) at org.apache.flink.api.table.TableEnvironment.(TableEnvironment.scala:73) at org.apache.flink.api.table.StreamTableEnvironment.(StreamTableEnvironment.scala:58) at org.apache.flink.api.java.table.StreamTableEnvironment.(StreamTableEnvironment.scala:45) at org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376) at org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala) at org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) ... 6 more Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite: at java.sql.DriverManager.getConnection(DriverManager.java:689) at java.sql.DriverManager.getConnection(DriverManager.java:208) at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144) ... 20 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
Tzu-Li (Gordon) Tai created FLINK-4582: -- Summary: Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams Key: FLINK-4582 URL: https://issues.apache.org/jira/browse/FLINK-4582 Project: Flink Issue Type: New Feature Components: Kinesis Connector, Streaming Connectors Reporter: Tzu-Li (Gordon) Tai Fix For: 1.2.0 AWS DynamoDB is a NoSQL database service that has a CDC-like (change data capture) feature called DynamoDB Streams (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), which is a stream feed of item-level table activities. The DynamoDB Streams shard abstraction follows that of Kinesis Streams with only a slight difference in resharding behaviours, so it is possible to build on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB Streams source. I propose an API something like this: {code} DataStream dynamoItemsCdc = FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) {code} The feature adds more connectivity to popular AWS services for Flink, and combining what Flink has for exactly-once semantics and out-of-core state backends with CDC can have very strong use cases. For this feature there should only be an extra dependency to the AWS Java SDK for DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Hi all, I thought about the API of the FLIP again. If we allow the "systemtime" attribute, we cannot implement a nice method chaining where the user can define a "allowLateness" only on event time. So even if the user expressed that "systemtime" is used we have to offer a "allowLateness" method because we have to assume that this attribute can also be the batch event time column, which is not very nice. class TumblingWindow(size: Expression) extends Window { def on(timeField: Expression): TumblingEventTimeWindow = new TumblingEventTimeWindow(alias, timeField, size) // has allowLateness() method } What do you think? Timo Am 05/09/16 um 10:41 schrieb Fabian Hueske: Hi Jark, you had asked for non-windowed aggregates in the Table API a few times. FLIP-11 proposes row-window aggregates which are a generalization of running aggregates (SlideRow unboundedPreceding). Can you have a look at the FLIP and give feedback whether this is what you are looking for? Improvement suggestions are very welcome as well. Thank you, Fabian 2016-09-01 16:12 GMT+02:00 Timo Walther : Hi all! Fabian and I worked on a FLIP for Stream Aggregations in the Table API. You can find the FLIP-11 here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% 3A+Table+API+Stream+Aggregations Motivation for the FLIP: The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support: - Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group. - Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows. Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables. We are looking forward to your feedback. Timo -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
[jira] [Created] (FLINK-4583) NullPointerException in CliFrontend
Greg Hogan created FLINK-4583: - Summary: NullPointerException in CliFrontend Key: FLINK-4583 URL: https://issues.apache.org/jira/browse/FLINK-4583 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.2.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor If no Flink program is executed the following exception message is printed. This can happen when a driver prints usage due to insufficient or improper configuration. {noformat} The program finished with the following exception: java.lang.NullPointerException at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1002) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1045) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4584) Fix broken links on flink.apache.org
Alexander Pivovarov created FLINK-4584: -- Summary: Fix broken links on flink.apache.org Key: FLINK-4584 URL: https://issues.apache.org/jira/browse/FLINK-4584 Project: Flink Issue Type: Bug Components: Project Website Reporter: Alexander Pivovarov Priority: Minor The following links are broken link: http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html correct link: https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html link: http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html correct link: https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html The following links show "Page 'X' Has Moved to" for 1-2 sec and then redirect to correct page link: http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html redirect-to: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html link: http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html redirect-to: https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4585) Fix broken links on flink.apache.org
Alexander Pivovarov created FLINK-4585: -- Summary: Fix broken links on flink.apache.org Key: FLINK-4585 URL: https://issues.apache.org/jira/browse/FLINK-4585 Project: Flink Issue Type: Bug Components: Project Website Reporter: Alexander Pivovarov Priority: Minor The following links are broken link: http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html correct link: https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html link: http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html correct link: https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html The following links show "Page 'X' Has Moved to" for 1-2 sec and then redirect to correct page link: http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html redirect-to: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html link: http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html redirect-to: https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
Johannes created FLINK-4586: --- Summary: NumberSequenceIterator and Accumulator threading issue Key: FLINK-4586 URL: https://issues.apache.org/jira/browse/FLINK-4586 Project: Flink Issue Type: Bug Components: DataSet API Affects Versions: 1.1.2 Reporter: Johannes Priority: Minor There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator. It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solution. The following scala snippit exemplifies the problem. Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used. If the parallelism is set to {{1}} the result is correct, which seems like there is a problem with threading. The problem occurs using the java and scala API. {code} env .fromParallelCollection(new NumberSequenceIterator(1, 100)) .map(new RichMapFunction[Long, Long] { var a : AverageAccumulator = _ override def map(value: Long): Long = { a.add(value) value } override def open(parameters: Configuration): Unit = { a = new AverageAccumulator getRuntimeContext.addAccumulator("test", a) } }) .reduce((a, b) => a + b) .print() val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult println(lastJobExecutionResult.getAccumulatorResult("test")) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Hi all, I'm on vacation for about five days , sorry to have missed this great FLIP. Yes, the non-windowed aggregates is a special case of row-window. And the proposal looks really good. Can we have a simplified form for the special case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) can be simplified to table.groupBy(‘a).select(…). The latter will actually call the former. Another question is about the rowtime. As the FLIP said, DataStream and StreamTableSource is responsible to assign timestamps and watermarks, furthermore “rowtime” and “systemtime” are not real column. IMO, it is different with Calcite’s rowtime, which is a real column in the table. In FLIP's way, we will lose some flexibility. Because the timestamp column may be created after some transformations or join operation, not created at beginning. So why do we have to define rowtime at beginning? (because of watermark?) Can we have a way to define rowtime after source table like TimestampAssinger? Regarding to “allowLateness” method. I come up a trick that we can make ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API will looks like this : window(Tumble over 10.minutes on rowtime allowLateness as ‘w) The implementation will look like this: class TumblingWindow(size: Expression) extends Window { def on(time: rowtime.type): TumblingEventTimeWindow = new TumblingEventTimeWindow(alias, ‘rowtime, size)// has allowLateness() method def on(time: systemtime.type): TumblingProcessingTimeWindow= new TumblingProcessingTimeWindow(alias, ‘systemtime, size) // hasn’t allowLateness() method } object rowtime object systemtime What do you think about this? - Jark Wu > 在 2016年9月6日,下午11:00,Timo Walther 写道: > > Hi all, > > I thought about the API of the FLIP again. If we allow the "systemtime" > attribute, we cannot implement a nice method chaining where the user can > define a "allowLateness" only on event time. So even if the user expressed > that "systemtime" is used we have to offer a "allowLateness" method because > we have to assume that this attribute can also be the batch event time > column, which is not very nice. > > class TumblingWindow(size: Expression) extends Window { > def on(timeField: Expression): TumblingEventTimeWindow = >new TumblingEventTimeWindow(alias, timeField, size) // has allowLateness() > method > } > > What do you think? > > Timo > > > Am 05/09/16 um 10:41 schrieb Fabian Hueske: >> Hi Jark, >> >> you had asked for non-windowed aggregates in the Table API a few times. >> FLIP-11 proposes row-window aggregates which are a generalization of >> running aggregates (SlideRow unboundedPreceding). >> >> Can you have a look at the FLIP and give feedback whether this is what you >> are looking for? >> Improvement suggestions are very welcome as well. >> >> Thank you, >> Fabian >> >> 2016-09-01 16:12 GMT+02:00 Timo Walther : >> >>> Hi all! >>> >>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>> You can find the FLIP-11 here: >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% >>> 3A+Table+API+Stream+Aggregations >>> >>> Motivation for the FLIP: >>> >>> The Table API is a declarative API to define queries on static and >>> streaming tables. So far, only projection, selection, and union are >>> supported operations on streaming tables. >>> >>> This FLIP proposes to add support for different types of aggregations on >>> top of streaming tables. In particular, we seek to support: >>> >>> - Group-window aggregates, i.e., aggregates which are computed for a group >>> of elements. A (time or row-count) window is required to bound the infinite >>> input stream into a finite group. >>> >>> - Row-window aggregates, i.e., aggregates which are computed for each row, >>> based on a window (range) of preceding and succeeding rows. >>> Each type of aggregate shall be supported on keyed/grouped or >>> non-keyed/grouped data streams for streaming tables as well as batch tables. >>> >>> We are looking forward to your feedback. >>> >>> Timo >>> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr
[jira] [Created] (FLINK-4587) Yet another java.lang.NoSuchFieldError: INSTANCE
Renkai Ge created FLINK-4587: Summary: Yet another java.lang.NoSuchFieldError: INSTANCE Key: FLINK-4587 URL: https://issues.apache.org/jira/browse/FLINK-4587 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.2.0 Environment: Latest SNAPSHOT Reporter: Renkai Ge For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2 in flink. The source file is: {code} import org.apache.flink.streaming.api.scala._ import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory /** * Created by renkai on 16/9/7. */ object Main { def main(args: Array[String]): Unit = { val instance = ManagedNHttpClientConnectionFactory.INSTANCE println("instance = " + instance) val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(1 to 100) val result = stream.map { x => x * 2 } result.print() env.execute("xixi") } } {code} and {code} name := "flink-explore" version := "1.0" scalaVersion := "2.11.8" crossPaths := false libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT" exclude("com.google.code.findbugs", "jsr305"), "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT" exclude("com.google.code.findbugs", "jsr305"), "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT" exclude("com.google.code.findbugs", "jsr305"), "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT" exclude("com.google.code.findbugs", "jsr305"), "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2" ) {code} I use `sbt assembly` to get a fat jar. If I run the command {code} java -cp flink-explore-assembly-1.0.jar Main {code} I get the result {code} instance = org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory@4909b8da log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915] 09/07/2016 12:05:26 Job execution switched to status RUNNING. 09/07/2016 12:05:26 Source: Collection Source(1/1) switched to SCHEDULED 09/07/2016 12:05:26 Source: Collection Source(1/1) switched to DEPLOYING ... 09/07/2016 12:05:26 Map -> Sink: Unnamed(20/24) switched to RUNNING 09/07/2016 12:05:26 Map -> Sink: Unnamed(19/24) switched to RUNNING 15> 30 20> 184 ... 19> 182 1> 194 8> 160 09/07/2016 12:05:26 Source: Collection Source(1/1) switched to FINISHED ... 09/07/2016 12:05:26 Map -> Sink: Unnamed(1/24) switched to FINISHED 09/07/2016 12:05:26 Job execution switched to status FINISHED. {code} Nothing special. But if I run the jar by {code} ./bin/flink run shop-monitor-flink-assembly-1.0.jar {code} I will get an error {code} $ ./bin/flink run flink-explore-assembly-1.0.jar Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program The program finished with the following exception: java.lang.NoSuchFieldError: INSTANCE at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:53) at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:57) at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.(DefaultHttpRequestWriterFactory.java:47) at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:75) at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:83) at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.(ManagedNHttpClientConnectionFactory.java:64) at Main$.main(Main.scala:9) at Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774) at org.
[DISCUSS] how choose Scala and Java
Scala and Java mixed in the module. Some Flink API indeed make someone confused. What is rule about the current Scala and Java API at the first implement time? Thanks
Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Hi, thanks for your comments and questions! Actually, you are bringing up the points that Timo and I discussed the most when designing the FLIP ;-) - We also thought about the syntactic shortcut for running aggregates like you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow this shortcut is to prevent users from accidentally performing a "dangerous" operation. The problem with unbounded sliding row-windows is that their state does never expire. If you have an evolving key space, you will likely run into problems at some point because the operator state grows too large. IMO, a row-window session is a better approach, because it defines a timeout after which state can be discarded. groupBy.select is a very common operation in batch but its semantics in streaming are very different. In my opinion it makes sense to make users aware of these differences through the API. - Reassigning timestamps and watermarks is a very delicate issue. You are right, that Calcite exposes this field which is necessary due to the semantics of SQL. However, also in Calcite you cannot freely choose the timestamp attribute for streaming queries (it must be a monotone or quasi-monotone attribute) which is hard to reason about (and guarantee) after a few operators have been applied. Streaming tables in Flink will likely have a time attribute which is identical to the initial rowtime. However, Flink does modify timestamps internally, e.g., for records that are emitted from time windows, in order to ensure that consecutive windows perform as expected. Modify or reassign timestamps in the middle of a job can result in unexpected results which are very hard to reason about. Do you have a concrete use case in mind for reassigning timestamps? - The idea to represent rowtime and systime as object is good. Our motivation to go for reserved Scala symbols was to have a uniform syntax with windows over streaming and batch tables. On batch tables you can compute time windows basically over every time attribute (they are treated similar to grouping attributes with a bit of extra logic to extract the grouping key for sliding and session windows). If you write window(Tumble over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate event-time. On a batch table with a 'rowtime attribute, the same operator would be internally converted into a group by. By going for the object approach we would lose this compatibility (or would need to introduce an additional column attribute to specifiy the window attribute for batch tables). As usual some of the design decisions are based on preferences. Do they make sense to you? Let me know what you think. Best, Fabian 2016-09-07 5:12 GMT+02:00 Jark Wu : > Hi all, > > I'm on vacation for about five days , sorry to have missed this great FLIP. > > Yes, the non-windowed aggregates is a special case of row-window. And the > proposal looks really good. Can we have a simplified form for the special > case? Such as : > table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) > can be simplified to table.groupBy(‘a).select(…). The latter will actually > call the former. > > Another question is about the rowtime. As the FLIP said, DataStream and > StreamTableSource is responsible to assign timestamps and watermarks, > furthermore “rowtime” and “systemtime” are not real column. IMO, it is > different with Calcite’s rowtime, which is a real column in the table. In > FLIP's way, we will lose some flexibility. Because the timestamp column may > be created after some transformations or join operation, not created at > beginning. So why do we have to define rowtime at beginning? (because of > watermark?) Can we have a way to define rowtime after source table like > TimestampAssinger? > > Regarding to “allowLateness” method. I come up a trick that we can make > ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API > will looks like this : > > window(Tumble over 10.minutes on rowtime allowLateness as ‘w) > > The implementation will look like this: > > class TumblingWindow(size: Expression) extends Window { > def on(time: rowtime.type): TumblingEventTimeWindow = > new TumblingEventTimeWindow(alias, ‘rowtime, size)// has > allowLateness() method > > def on(time: systemtime.type): TumblingProcessingTimeWindow= > new TumblingProcessingTimeWindow(alias, ‘systemtime, size) > // hasn’t allowLateness() method > } > object rowtime > object systemtime > > What do you think about this? > > - Jark Wu > > > 在 2016年9月6日,下午11:00,Timo Walther 写道: > > > > Hi all, > > > > I thought about the API of the FLIP again. If we allow the "systemtime" > attribute, we cannot implement a nice method chaining where the user can > define a "allowLateness" only on event time. So even if the user expressed > that "systemtime" is used we have to offer a "allowLateness" method because > we have to assume that this attribute can also be the batch event time >