This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 2353abe KYLIN-4736 Upgrade flink version to 1.11.1 (#1462)
2353abe is described below
commit 2353abe84b2a60fe2dec0485d638a98e20244004
Author: chenjie-sau <[email protected]>
AuthorDate: Tue Oct 27 10:35:07 2020 +0800
KYLIN-4736 Upgrade flink version to 1.11.1 (#1462)
---
build/bin/download-flink.sh | 13 ++++++-------
.../java/org/apache/kylin/common/util/HadoopUtil.java | 2 ++
core-common/src/main/resources/kylin-defaults.properties | 5 ++---
.../kylin/engine/flink/FlinkOnYarnConfigMapping.java | 16 ++++------------
.../kylin/engine/flink/FlinkOnYarnConfigMappingTest.java | 14 +++++++-------
pom.xml | 2 +-
6 files changed, 22 insertions(+), 30 deletions(-)
diff --git a/build/bin/download-flink.sh b/build/bin/download-flink.sh
index 4d2fdb8..c118a8f 100755
--- a/build/bin/download-flink.sh
+++ b/build/bin/download-flink.sh
@@ -35,12 +35,11 @@ if [[ `uname -a` =~ "Darwin" ]]; then
alias md5cmd="md5 -q"
fi
-flink_version="1.9.2"
+flink_version="1.11.1"
scala_version="2.11"
-flink_shaded_version="10.0"
-hadoop_version="2.7.5"
-flink_pkg_md5="0718a04fe0a641cc5f5368124a4c54a5"
-flink_shaded_hadoop_md5="4287a314bfb09a3dc957cbda3f91d7ca"
+flink_shaded_hadoop_version="3.1.1.7.1.1.0-565-9.0"
+flink_pkg_md5="3b7aa59b44add1a0625737f6516e0929"
+flink_shaded_hadoop_md5="7b78e546dd93f4facd322921f29de1eb"
if [ ! -f "flink-${flink_version}-bin-scala_${scala_version}.tgz" ]; then
echo "No binary file found, start to download package to
${flink_package_dir}"
@@ -53,8 +52,8 @@ else
fi
fi
-flink_shaded_hadoop_jar="flink-shaded-hadoop-2-uber-${hadoop_version}-${flink_shaded_version}.jar"
-flink_shaded_hadoop_path="https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${hadoop_version}-${flink_shaded_version}/${flink_shaded_hadoop_jar}"
+flink_shaded_hadoop_jar="flink-shaded-hadoop-3-uber-${flink_shaded_hadoop_version}.jar"
+flink_shaded_hadoop_path="https://repository.cloudera.com/artifactory/libs-release-local/org/apache/flink/flink-shaded-hadoop-3-uber/${flink_shaded_hadoop_version}/${flink_shaded_hadoop_jar}"
if [ ! -f $flink_shaded_hadoop_jar ]; then
echo "Start to download $flink_shaded_hadoop_jar"
diff --git
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 0f6da04..26d0ea3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -64,12 +64,14 @@ public class HadoopUtil {
return conf;
}
Configuration conf = hadoopConfig.get();
+ conf.set("fs.hdfs.impl.disable.cache", "true");
return conf;
}
public static Configuration healSickConfig(Configuration conf) {
// https://issues.apache.org/jira/browse/KYLIN-3064
conf.set("yarn.timeline-service.enabled", "false");
+ conf.set("fs.hdfs.impl.disable.cache", "true");
return conf;
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties
b/core-common/src/main/resources/kylin-defaults.properties
index c16419a..ebf1cd2 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -352,10 +352,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
### FLINK ENGINE CONFIGS ###
## Flink conf (default is in flink/conf/flink-conf.yaml)
-kylin.engine.flink-conf.jobmanager.heap.size=2G
-kylin.engine.flink-conf.taskmanager.heap.size=4G
+kylin.engine.flink-conf.jobmanager.memory.process.size=2G
+kylin.engine.flink-conf.taskmanager.memory.process.size=4G
kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1
-kylin.engine.flink-conf.taskmanager.memory.preallocate=false
kylin.engine.flink-conf.job.parallelism=1
kylin.engine.flink-conf.program.enableObjectReuse=false
kylin.engine.flink-conf.yarn.queue=
diff --git
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index 154d4e2..a3d2a65 100644
---
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -14,13 +14,14 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.flink;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.FallbackKey;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import java.util.HashMap;
import java.util.Iterator;
@@ -38,7 +39,7 @@ public class FlinkOnYarnConfigMapping {
flinkOnYarnConfigMap = new HashMap<>();
//mapping job manager heap size -> -yjm
- ConfigOption<String> jmHeapSizeOption =
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
+ ConfigOption<MemorySize> jmHeapSizeOption =
JobManagerOptions.TOTAL_PROCESS_MEMORY;
flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
if (jmHeapSizeOption.hasFallbackKeys()) {
Iterator<FallbackKey> deprecatedKeyIterator =
jmHeapSizeOption.fallbackKeys().iterator();
@@ -48,7 +49,7 @@ public class FlinkOnYarnConfigMapping {
}
//mapping task manager heap size -> -ytm
- ConfigOption<String> tmHeapSizeOption =
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
+ ConfigOption<MemorySize> tmHeapSizeOption =
TaskManagerOptions.TOTAL_PROCESS_MEMORY;
flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
if (tmHeapSizeOption.hasFallbackKeys()) {
Iterator<FallbackKey> deprecatedKeyIterator =
tmHeapSizeOption.fallbackKeys().iterator();
@@ -66,15 +67,6 @@ public class FlinkOnYarnConfigMapping {
}
}
- ConfigOption<Boolean> tmMemoryPreallocate =
TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
- flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD
taskmanager.memory.preallocate");
- if (taskSlotNumOption.hasFallbackKeys()) {
- Iterator<FallbackKey> deprecatedKeyIterator =
tmMemoryPreallocate.fallbackKeys().iterator();
- while (deprecatedKeyIterator.hasNext()) {
-
flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yD
taskmanager.memory.preallocate");
- }
- }
-
//config options do not have mapping with config file key
flinkOnYarnConfigMap.put("yarn.queue", "-yqu");
flinkOnYarnConfigMap.put("yarn.nodelabel", "-ynl");
diff --git
a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index 3cb6f28..ca301db 100644
---
a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++
b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.flink;
import org.apache.flink.configuration.FallbackKey;
@@ -40,10 +40,10 @@ public class FlinkOnYarnConfigMappingTest {
String flinkConfigOption = entry.getKey();
boolean matchedAnyOne;
- matchedAnyOne =
flinkConfigOption.equals(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key());
+ matchedAnyOne =
flinkConfigOption.equals(JobManagerOptions.TOTAL_PROCESS_MEMORY.key());
if (!matchedAnyOne) {
- if
(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
- Iterator<FallbackKey> deprecatedKeyIterator =
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
+ if
(JobManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator =
JobManagerOptions.TOTAL_PROCESS_MEMORY
.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
matchedAnyOne = matchedAnyOne &&
flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
@@ -65,10 +65,10 @@ public class FlinkOnYarnConfigMappingTest {
String flinkConfigOption = entry.getKey();
boolean matchedAnyOne;
- matchedAnyOne =
flinkConfigOption.equals(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key());
+ matchedAnyOne =
flinkConfigOption.equals(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key());
if (!matchedAnyOne) {
- if
(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
- Iterator<FallbackKey> deprecatedKeyIterator =
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
+ if
(TaskManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) {
+ Iterator<FallbackKey> deprecatedKeyIterator =
TaskManagerOptions.TOTAL_PROCESS_MEMORY
.fallbackKeys().iterator();
while (deprecatedKeyIterator.hasNext()) {
matchedAnyOne = matchedAnyOne &&
flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
diff --git a/pom.xml b/pom.xml
index 1c7444e..ea348dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
<kryo.version>4.0.0</kryo.version>
<!-- Flink versions -->
- <flink.version>1.9.2</flink.version>
+ <flink.version>1.11.1</flink.version>
<!-- mysql versions -->
<mysql-connector.version>5.1.8</mysql-connector.version>