This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8b6d57939 [INLONG-4816][DataProxy] Add warn log when there is no
TubeMQ info when start for the first time (#4817)
8b6d57939 is described below
commit 8b6d579398737a035df897da9055e78ced526095
Author: Charles Zhang <[email protected]>
AuthorDate: Thu Jun 30 11:33:17 2022 +0800
[INLONG-4816][DataProxy] Add warn log when there is no TubeMQ info when
start for the first time (#4817)
---
.../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 8 +++++---
.../apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java | 2 +-
pom.xml | 6 ------
3 files changed, 6 insertions(+), 10 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 97c92f361..17b1ee6fa 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -259,14 +259,16 @@ public class TubeSink extends AbstractSink implements
Configurable {
* @throws FlumeException if an RPC client connection could not be opened
*/
private void initCreateConnection() throws FlumeException {
-// synchronized (tubeSessionLock) {
+ // check the TubeMQ address
+ if (masterHostAndPortLists == null ||
masterHostAndPortLists.isEmpty()) {
+ logger.warn("Failed to get TubeMQ Cluster, make sure register
TubeMQ to manager successfully.");
+ return;
+ }
// if already connected, just skip
if (sessionFactories != null) {
return;
}
sessionFactories = new HashMap<>();
- Preconditions.checkState(masterHostAndPortLists != null &&
!masterHostAndPortLists.isEmpty(),
- "No tube service url specified");
for (String masterUrl : masterHostAndPortLists) {
createConnection(masterUrl);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index dad0da6d4..cef15ad0a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -136,7 +136,7 @@ public class PulsarClientService {
}
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
- if (pulsarUrl2token.isEmpty()) {
+ if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
logger.warn("Failed to get Pulsar Cluster, make sure register
pulsar to manager successfully.");
return;
}
diff --git a/pom.xml b/pom.xml
index 9f2983b65..b99707ead 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1007,12 +1007,6 @@
<artifactId>iceberg-flink-runtime-1.13</artifactId>
<version>${iceberg.flink.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-runtime-1.13</artifactId>
- <version>${iceberg.flink.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>