This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec726d6 [INLONG-3434][Sort] Optimize the automatic splicing of
database after hive jdbc url (#3444)
ec726d6 is described below
commit ec726d624e179864b38fdadd2010211ba30a9721
Author: yunqingmoswu <[email protected]>
AuthorDate: Tue Mar 29 20:08:56 2022 +0800
[INLONG-3434][Sort] Optimize the automatic splicing of database after hive
jdbc url (#3444)
Co-authored-by: yunqingmo <[email protected]>
---
.../partition/JdbcHivePartitionCommitPolicy.java | 57 ++++++++++++-------
.../JdbcHivePartitionCommitPolicyTest.java | 65 ++++++++++++++++++++++
2 files changed, 102 insertions(+), 20 deletions(-)
diff --git
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
index 6624a75..2116180 100644
---
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
+++
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
@@ -18,20 +18,22 @@
package org.apache.inlong.sort.flink.hive.partition;
import com.google.common.base.Preconditions;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
/**
* Partition commit policy to create partitions in hive table.
*/
public class JdbcHivePartitionCommitPolicy implements PartitionCommitPolicy {
+
private static final Logger LOG =
LoggerFactory.getLogger(JdbcHivePartitionCommitPolicy.class);
private static final String driverClass =
"org.apache.hive.jdbc.HiveDriver";
@@ -50,22 +52,23 @@ public class JdbcHivePartitionCommitPolicy implements
PartitionCommitPolicy {
connection = getHiveConnection();
}
- @Override
- public void commit(Context context) throws Exception {
- final String databaseName = context.databaseName();
- final String tableName = context.tableName();
- Statement statement = connection.createStatement();
- String sql = generateCreatePartitionSql(databaseName, tableName,
context.partition());
- statement.execute(sql);
- }
-
- @Override
- public void close() throws Exception {
- connection.close();
- }
-
public static String getHiveConnStr(String hiveServerJdbcUrl, String
databaseName) {
- return hiveServerJdbcUrl + "/" + databaseName;
+ String firstPartOfJdbcUrl = hiveServerJdbcUrl;
+ String secondPartOfJdbcUrl = "";
+ if (hiveServerJdbcUrl.contains(";")) {
+ firstPartOfJdbcUrl = hiveServerJdbcUrl.substring(0,
hiveServerJdbcUrl.indexOf(";"));
+ secondPartOfJdbcUrl =
hiveServerJdbcUrl.substring(hiveServerJdbcUrl.indexOf(";"));
+ }
+ String[] firstPartOfJdbcUrlArr = firstPartOfJdbcUrl.split("//");
+ String hostAndPort = firstPartOfJdbcUrlArr[1];
+ if (hostAndPort.contains("/")) {
+ hostAndPort = hostAndPort.substring(0, hostAndPort.indexOf("/"));
+ }
+ String hiveConnStr = String.format("%s//%s/%s",
firstPartOfJdbcUrlArr[0], hostAndPort, databaseName);
+ if (!"".equals(secondPartOfJdbcUrl)) {
+ hiveConnStr += "/" + secondPartOfJdbcUrl;
+ }
+ return hiveConnStr;
}
public static String generateCreatePartitionSql(
@@ -80,7 +83,7 @@ public class JdbcHivePartitionCommitPolicy implements
PartitionCommitPolicy {
.append(tableName)
.append(" ADD IF NOT EXISTS PARTITION (");
- for (Tuple2<String, String> partition: hivePartition.getPartitions()) {
+ for (Tuple2<String, String> partition : hivePartition.getPartitions())
{
stringBuilder
.append(partition.f0)
.append(" = '")
@@ -95,6 +98,20 @@ public class JdbcHivePartitionCommitPolicy implements
PartitionCommitPolicy {
return result + ")";
}
+ @Override
+ public void commit(Context context) throws Exception {
+ final String databaseName = context.databaseName();
+ final String tableName = context.tableName();
+ Statement statement = connection.createStatement();
+ String sql = generateCreatePartitionSql(databaseName, tableName,
context.partition());
+ statement.execute(sql);
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ }
+
private Connection getHiveConnection() throws SQLException,
ClassNotFoundException {
Class.forName(driverClass);
diff --git
a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
new file mode 100644
index 0000000..d1c4341
--- /dev/null
+++
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.hive.partition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JdbcHivePartitionCommitPolicyTest {
+
+ @Test
+ public void testGetHiveConnStr1() {
+ String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000";
+ String database = "test666";
+ String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl,
database), expectValue);
+ }
+
+ @Test
+ public void testGetHiveConnStr2() {
+ String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000/";
+ String database = "test666";
+ String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl,
database), expectValue);
+ }
+
+ @Test
+ public void testGetHiveConnStr3() {
+ String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000/test888";
+ String database = "test666";
+ String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl,
database), expectValue);
+ }
+
+ @Test
+ public void testGetHiveConnStr4() {
+ String hiveJdbcUrl =
"jdbc:hive2://127.0.0.1:10000/test888/;principal=hive/[email protected]";
+ String database = "test666";
+ String expectValue =
"jdbc:hive2://127.0.0.1:10000/test666/;principal=hive/[email protected]";
+
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl,
database), expectValue);
+ }
+
+ @Test
+ public void testGetHiveConnStr5() {
+ String hiveJdbcUrl =
"jdbc:hive2://127.0.0.1:10000/test888/test888;principal=hive/[email protected]";
+ String database = "test666";
+ String expectValue =
"jdbc:hive2://127.0.0.1:10000/test666/;principal=hive/[email protected]";
+
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl,
database), expectValue);
+ }
+
+}