felixcheung closed pull request #3266: [ZEPPELIN-3914] upgrade Flink to 1.7.0 URL: https://github.com/apache/zeppelin/pull/3266
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index 2cf31257ad..d3f2223432 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -50,7 +50,7 @@ At the "Interpreters" menu, you have to create a new Flink interpreter and provi </tr> </table> -For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html). +For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html). ## How to test it's working You can find an example of Flink usage in the Zeppelin Tutorial folder or try the following word count example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup. diff --git a/flink/pom.xml b/flink/pom.xml index 7a374f25e7..331e19cef4 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -36,7 +36,7 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> - <flink.version>1.5.2</flink.version> + <flink.version>1.7.1</flink.version> <flink.akka.version>2.3.7</flink.akka.version> <scala.macros.version>2.0.1</scala.macros.version> <scala.binary.version>2.11</scala.binary.version> diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala index 1694a4491c..b2d8d1679c 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala @@ -30,7 +30,7 @@ class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter, def interpret(code: String, context: InterpreterContext): InterpreterResult = { try { - val table: Table = this.btenv.sql(code) + val table: Table = this.btenv.sqlQuery(code) val result = z.showData(table) return new InterpreterResult(InterpreterResult.Code.SUCCESS, result) } catch { diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 14f895962b..1d8b27e4a7 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.scala.FlinkShell._ import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop} import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.GlobalConfiguration -import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster} +import org.apache.flink.runtime.minicluster.MiniCluster import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} @@ -45,8 +45,7 @@ class FlinkScalaInterpreter(val properties: Properties) { lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) private var flinkILoop: FlinkILoop = _ - private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster], - ClusterClient[_]]] = _ + private var cluster: Option[Either[MiniCluster, ClusterClient[_]]] = _ private var scalaCompleter: ScalaCompleter = _ private val interpreterOutput = new InterpreterOutputStream(LOGGER) @@ -68,8 +67,7 @@ class FlinkScalaInterpreter(val properties: Properties) { val (iLoop, cluster) = try { val (host, port, cluster) = fetchConnectionInfo(configuration, config) val conf = cluster match { - case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration - case Some(Left(Right(_))) => configuration + case Some(Left(miniCluster)) => configuration case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration case None => configuration } @@ -213,10 +211,7 @@ class FlinkScalaInterpreter(val properties: Properties) { } if (cluster != null) { cluster match { - case Some(Left(Left(legacyMiniCluster))) => - LOGGER.info("Shutdown LegacyMiniCluster") - legacyMiniCluster.close() - case Some(Left(Right(newMiniCluster))) => + case Some(Left(newMiniCluster)) => LOGGER.info("Shutdown NewMiniCluster") newMiniCluster.close() case Some(Right(yarnCluster)) => ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services