[GitHub] [inlong] healchow merged pull request #6055: [INLONG-5978][Manager] Add protocol type in cluster nodes

2022-10-12 Thread GitBox


healchow merged PR #6055:
URL: https://github.com/apache/inlong/pull/6055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)

2022-10-12 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new aa91f5984 [INLONG-5978][Manager] Add protocol type in cluster nodes 
(#6055)
aa91f5984 is described below

commit aa91f5984cf0cd230cba9098834b028a554fc65c
Author: xuesongxs <54351417+xueson...@users.noreply.github.com>
AuthorDate: Wed Oct 12 15:43:49 2022 +0800

[INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)

* Add protocol type constant class

* Fix unit test error

* Update NodeEditModal.tsx

Co-authored-by: healchow 
Co-authored-by: Daniel 
---
 .../common/heartbeat/ComponentHeartbeat.java   |  5 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java  |  7 ++-
 .../common/pojo/dataproxy/DataProxyNodeInfo.java   |  4 ++
 inlong-dashboard/src/locales/cn.json   |  2 +
 inlong-dashboard/src/locales/en.json   |  2 +
 .../src/pages/Clusters/NodeEditModal.tsx   | 19 
 inlong-dashboard/src/pages/Clusters/NodeManage.tsx |  4 ++
 .../inlong/manager/common/consts/ProtocolType.java | 26 +++---
 .../dao/entity/InlongClusterNodeEntity.java|  1 +
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  3 +-
 .../mappers/InlongClusterNodeEntityMapper.xml  | 57 --
 .../manager/pojo/cluster/ClusterNodeRequest.java   |  4 ++
 .../manager/pojo/cluster/ClusterNodeResponse.java  |  3 ++
 .../service/cluster/InlongClusterService.java  |  5 +-
 .../service/cluster/InlongClusterServiceImpl.java  | 30 +---
 .../service/core/heartbeat/HeartbeatManager.java   |  2 +
 .../service/cluster/InlongClusterServiceTest.java  | 46 +++--
 .../core/heartbeat/HeartbeatManagerTest.java   |  3 ++
 .../service/core/impl/HeartbeatServiceTest.java|  2 +
 .../main/resources/h2/apache_inlong_manager.sql| 31 ++--
 .../manager-web/sql/apache_inlong_manager.sql  | 31 ++--
 .../controller/openapi/DataProxyController.java|  6 ++-
 22 files changed, 181 insertions(+), 112 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 260e4618c..8d6beda08 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -35,18 +35,21 @@ public class ComponentHeartbeat {
 
 private int port;
 
+private String protocolType;
+
 private String inCharges;
 
 public ComponentHeartbeat() {
 }
 
 public ComponentHeartbeat(String clusterTag, String clusterName, String 
componentType, String ip, int port,
-String inCharges) {
+String inCharges, String protocolType) {
 this.clusterTag = clusterTag;
 this.clusterName = clusterName;
 this.componentType = componentType;
 this.ip = ip;
 this.port = port;
+this.protocolType = protocolType;
 this.inCharges = inCharges;
 }
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 5277eaf22..42038d993 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -43,6 +43,11 @@ public class HeartbeatMsg {
  */
 private int port;
 
+/**
+ * ProtocolType of component
+ */
+private String protocolType;
+
 /**
  * Type of component
  */
@@ -79,6 +84,6 @@ public class HeartbeatMsg {
 private List streamHeartbeats;
 
 public ComponentHeartbeat componentHeartbeat() {
-return new ComponentHeartbeat(clusterTag, clusterName, componentType, 
ip, port, inCharges);
+return new ComponentHeartbeat(clusterTag, clusterName, componentType, 
ip, port, inCharges, protocolType);
 }
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
index 6425348a8..9aca22240 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java
@@ -40,4 +40,8 @@ public class DataProxyNodeInfo {
  */
 private Integer port;
 
+/**
+ * Node protocol type
+ */
+private String protocolType;
 }
diff --git a/inlong-dashboard/src/locales/cn.json 
b/inlong-dashboard/src/locales/cn.json
index 16e5161f7..feeb8b4d6 100644
--- a/inlong-da

[GitHub] [inlong] leezng merged pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field

2022-10-12 Thread GitBox


leezng merged PR #6135:
URL: https://github.com/apache/inlong/pull/6135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field (#6135)

2022-10-12 Thread leezng
This is an automated email from the ASF dual-hosted git repository.

leezng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 435eb7147 [INLONG-5814][Dashboard] Supports setting the precision of a 
field when adding a table field (#6135)
435eb7147 is described below

commit 435eb71475662823b9f7b6e5ca500bbc13c97923
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Wed Oct 12 15:54:11 2022 +0800

[INLONG-5814][Dashboard] Supports setting the precision of a field when 
adding a table field (#6135)
---
 inlong-dashboard/src/metas/sinks/greenplum.tsx  | 84 +++-
 inlong-dashboard/src/metas/sinks/oracle.tsx | 75 +
 inlong-dashboard/src/metas/sinks/postgreSql.tsx | 83 +++-
 inlong-dashboard/src/metas/sinks/sqlServer.tsx  | 86 -
 4 files changed, 209 insertions(+), 119 deletions(-)

diff --git a/inlong-dashboard/src/metas/sinks/greenplum.tsx 
b/inlong-dashboard/src/metas/sinks/greenplum.tsx
index aff09821c..8af638901 100644
--- a/inlong-dashboard/src/metas/sinks/greenplum.tsx
+++ b/inlong-dashboard/src/metas/sinks/greenplum.tsx
@@ -20,35 +20,40 @@ import type { FieldItemType } from '@/metas/common';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from './common/sourceFields';
 
-const greenplumFieldTypes = [
-  'SMALLINT',
-  'INT2',
-  'SMALLSERIAL',
-  'SERIAL',
-  'SERIAL2',
-  'INTEGER',
-  'BIGINT',
-  'BIGSERIAL',
-  'REAL',
-  'FLOAT4',
-  'FLOAT8',
-  'DOUBLE',
-  'NUMERIC',
-  'DECIMAL',
-  'BOOLEAN',
-  'DATE',
-  'TIME',
-  'TIMESTAMP',
-  'CHAR',
-  'CHARACTER',
-  'VARCHAR',
-  'TEXT',
-  'BYTEA',
-  // 'interval',
-].map(item => ({
-  label: item,
-  value: item,
-}));
+const fieldTypesConf = {
+  SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1 <= M <= 6'),
+  INT2: () => '',
+  SMALLSERIAL: (m, d) => (1 <= m && m <= 6 ? '' : '1 <= M <= 6'),
+  SERIAL: (m, d) => (1 <= m && m <= 11 ? '' : '1 <= M <= 11'),
+  SERIAL2: () => '',
+  INTEGER: (m, d) => (1 <= m && m <= 11 ? '' : '1 <= M <= 11'),
+  BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1 <= M <= 20'),
+  BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1 <= M <= 20'),
+  REAL: () => '',
+  FLOAT4: () => '',
+  FLOAT8: () => '',
+  DOUBLE: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 
38, 0 <= D < M'),
+  NUMERIC: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 
38, 0 <= D < M'),
+  DECIMAL: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 
38, 0 <= D < M'),
+  BOOLEAN: () => '',
+  DATE: () => '',
+  TIME: () => '',
+  TIMESTAMP: () => '',
+  CHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'),
+  CHARACTER: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'),
+  VARCHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'),
+  TEXT: () => '',
+  BYTEA: () => '',
+};
+
+const greenplumFieldTypes = Object.keys(fieldTypesConf).reduce(
+  (acc, key) =>
+acc.concat({
+  label: key,
+  value: key,
+}),
+  [],
+);
 
 export const greenplum: FieldItemType[] = [
   {
@@ -154,12 +159,29 @@ const getFieldListColumns = sinkValues => {
   title: `GREENPLUM${i18n.t('meta.Sinks.Greenplum.FieldType')}`,
   dataIndex: 'fieldType',
   initialValue: greenplumFieldTypes[0].value,
-  type: 'select',
+  type: 'autocomplete',
   props: (text, record, idx, isNew) => ({
 options: greenplumFieldTypes,
 disabled: [110, 130].includes(sinkValues?.status as number) && !isNew,
   }),
-  rules: [{ required: true }],
+  rules: [
+{ required: true },
+() => ({
+  validator(_, val) {
+if (val) {
+  const [, type = val, typeLength = ''] = 
val.match(/^(.+)\((.+)\)$/) || [];
+  if (fieldTypesConf.hasOwnProperty(type)) {
+const [m = -1, d = -1] = typeLength.split(',');
+const errMsg = fieldTypesConf[type]?.(m, d);
+if (typeLength && errMsg) return Promise.reject(new 
Error(errMsg));
+  } else {
+return Promise.reject(new Error('FieldType error'));
+  }
+}
+return Promise.resolve();
+  },
+}),
+  ],
 },
 {
   title: i18n.t('meta.Sinks.Greenplum.IsMetaField'),
diff --git a/inlong-dashboard/src/metas/sinks/oracle.tsx 
b/inlong-dashboard/src/metas/sinks/oracle.tsx
index 05e9ec4ce..495ac91c8 100644
--- a/inlong-dashboard/src/metas/sinks/oracle.tsx
+++ b/inlong-dashboard/src/metas/sinks/oracle.tsx
@@ -20,30 +20,36 @@ import type { FieldItemType } from '@/metas/common';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from './common/sourceFields';
 
-const oracleFieldTypes = [
-  'BINARY_FLOAT',
-  'BINARY_DOUBLE',
-  'SMALLINT',
-  'FLOAT',
-  'FLOAT4',
-  'FLO

[GitHub] [inlong] haifxu opened a new pull request, #6155: [INLONG][Manager] Support to get audit data from clickhouse

2022-10-12 Thread GitBox


haifxu opened a new pull request, #6155:
URL: https://github.com/apache/inlong/pull/6155

   ### Prepare a Pull Request
   
   - Fixes #6154 
   
   ### Motivation
   
   Support to get audit data from clickhouse.
   
   ### Modifications
   
   1.Add clickhouse config information.
   2.Support to get audit data from clickhouse.
   
   ### Verifying this change
   
   
![image](https://user-images.githubusercontent.com/58519431/195284761-472a9cbe-4d00-46f1-a859-03a90713566a.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] luchunliang commented on a diff in pull request #6155: [INLONG][Manager] Support to get audit data from clickhouse

2022-10-12 Thread GitBox


luchunliang commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r993131615


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##
@@ -75,6 +78,7 @@
 public class AuditServiceImpl implements AuditService {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditServiceImpl.class);
+private static final String SECOND_FORMAT = "yyy-MM-dd HH:mm:ss";

Review Comment:
   miss 'y'
   yyy-MM-dd HH:mm:ss



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #5802: [INLONG-5495][SDK] Support multi-topic manager

2022-10-12 Thread GitBox


dockerzhang merged PR #5802:
URL: https://github.com/apache/inlong/pull/5802


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5495][SDK] Support multi-topic manager (#5802)

2022-10-12 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager  (#5802)
08dfa0ecc is described below

commit 08dfa0ecc48944f6fc45197827c177e9eb07d528
Author: vernedeng 
AuthorDate: Wed Oct 12 16:10:58 2022 +0800

[INLONG-5495][SDK] Support multi-topic manager  (#5802)
---
 .../sdk/sort/api/InlongTopicManagerFactory.java|   3 +-
 .../sort/fetcher/kafka/AckOffsetOnRebalance.java   |  10 +-
 .../fetcher/kafka/KafkaSingleTopicFetcher.java |   2 +-
 .../sort/impl/kafka/InLongKafkaFetcherImpl.java|   2 +-
 .../sdk/sort/manager/InlongMultiTopicManager.java  | 289 +
 5 files changed, 300 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
index a8219f95f..834315236 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.inlong.sdk.sort.api;
 
 import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType;
+import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager;
 import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
 
 /**
@@ -47,6 +48,6 @@ public class InlongTopicManagerFactory {
 public static TopicManager createMultiTopicManager(
 ClientContext context,
 QueryConsumeConfig queryConsumeConfig) {
-return null;
+return new InlongMultiTopicManager(context, queryConsumeConfig);
 }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
index 0cf27bc82..d0a66c6e3 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
@@ -51,9 +51,13 @@ public class AckOffsetOnRebalance implements 
ConsumerRebalanceListener {
 private final AtomicLong revokedNum = new AtomicLong(0);
 private final AtomicLong assignedNum = new AtomicLong(0);
 
-public AckOffsetOnRebalance(String clusterId, Seeker seeker,
-ConcurrentHashMap commitOffsetMap) {
-this(clusterId, seeker, commitOffsetMap, null, null);
+public AckOffsetOnRebalance(
+String clusterId,
+Seeker seeker,
+ConcurrentHashMap commitOffsetMap,
+KafkaConsumer consumer) {
+this(clusterId, seeker, commitOffsetMap, null, consumer);
 }
 
 public AckOffsetOnRebalance(
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index 9ed97261a..3f903720b 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -79,7 +79,7 @@ public class KafkaSingleTopicFetcher extends 
SingleTopicFetcher {
 this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
 consumer.subscribe(Collections.singletonList(topic.getTopic()),
 new 
AckOffsetOnRebalance(this.topic.getInLongCluster().getClusterId(), seeker,
-commitOffsetMap));
+commitOffsetMap, consumer));
 } else {
 LOGGER.info("consumer is null");
 return false;
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index d2da76c0f..2dd9c2cf1 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -77,7 +77,7 @@ public class InLongKafkaFetcherImpl extends 
InLongTopicFetcher {
 this.seeker = SeekerFactory.createKafkaSeeker(consumer, 
inLongTopic);
 
consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
 new 
AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), se

[GitHub] [inlong] luchunliang commented on a diff in pull request #6155: [INLONG][Manager] Support to get audit data from clickhouse

2022-10-12 Thread GitBox


luchunliang commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r993133586


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##
@@ -211,6 +231,31 @@ private SearchRequest toAuditSearchRequest(String index, 
String groupId, String
 return new SearchRequest(new String[]{index}, sourceBuilder);
 }
 
+/**
+ * Convert to clickhouse search sql
+ *
+ * @param groupId The groupId of inlong
+ * @param streamId The streamId of inlong
+ * @param auditId The auditId of request
+ * @param dt The datetime of request
+ * @return clickhouse sql
+ */
+private String toAuditCkSql(String groupId, String streamId, String 
auditId, String dt) {
+DateTimeFormatter formatter = DateTimeFormat.forPattern(DAY_FORMAT);
+DateTime date = formatter.parseDateTime(dt);
+String sDate = date.toString(SECOND_FORMAT);
+String eDate = date.plusDays(1).toString(SECOND_FORMAT);
+return new SQL()
+.SELECT("log_ts", "sum(count) as total")
+.FROM("apache_inlong_audit.audit_data")

Review Comment:
   It is not good that code use fixed database name.



##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##
@@ -155,6 +161,20 @@ public List listByCondition(AuditRequest request) 
throws IOException {
 
auditId.equals(AuditConstants.AUDIT_ID_SORT_OUTPUT) ? sinkNodeType : null));
 }
 }
+} else if (AuditQuerySource.CLICKHOUSE == querySource) {
+Statement ckStatement = clickhouseConfig.getCkStatement();

Review Comment:
   miss Statement.close().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #6157: [INLONG-6156][DataProxy] Twice event write when topic is empty

2022-10-12 Thread GitBox


gosonzhang opened a new pull request, #6157:
URL: https://github.com/apache/inlong/pull/6157

   
   - Fixes #6156
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


healchow commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r993164303


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickhouseConfig.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.ck;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+/**
+ * Clickhouse config information, including url, user, etc.
+ */
+@Component
+public class ClickhouseConfig {
+
+@Value("${ck.index.search.url}")
+private String url;
+
+@Value("${ck.auth.user}")
+private String user;
+
+@Value("${ck.auth.password}")
+private String password;
+
+private Statement ckStatement;
+
+/**
+ * Get ClickHouse statement from config file
+ */
+public Statement getCkStatement() throws Exception {
+if (ckStatement == null) {

Review Comment:
   1. This usage seems not thread-safe.
   2. Do we really need a long-time connection for the JDBC statement?



##
inlong-manager/manager-web/src/main/resources/application-test.properties:
##
@@ -63,3 +63,11 @@ es.auth.enable=false
 es.auth.user=admin
 # Elasticsearch password of authentication info
 es.auth.password=inlong
+
+# ClickHouse config
+# ClickHouse url
+ck.index.search.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit

Review Comment:
   Suggested changing to `audit.ck.jdbcUrl`, `audit.ck.username`, 
`audit.ck.password`.



##
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java:
##
@@ -28,5 +28,9 @@ public enum AuditQuerySource {
 /**
  * ELASTICSEARCH source of query
  */
-ELASTICSEARCH;
+ELASTICSEARCH,
+/**
+ * ELASTICSEARCH source of query

Review Comment:
   Not `ELASTICSEARCH`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (08dfa0ecc -> 53f0038fc)

2022-10-12 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager  (#5802)
 add 53f0038fc [INLONG-6156][DataProxy] Twice event write when topic is 
empty (#6157)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java|  2 +-
 .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java   | 17 -
 2 files changed, 9 insertions(+), 10 deletions(-)



[GitHub] [inlong] gosonzhang merged pull request #6157: [INLONG-6156][DataProxy] Twice event write when topic is empty

2022-10-12 Thread GitBox


gosonzhang merged PR #6157:
URL: https://github.com/apache/inlong/pull/6157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch release-1.3.0 updated: [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)

2022-10-12 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is 
empty (#6157)
5cf0b4311 is described below

commit 5cf0b4311bade4e203f8c2153d724376144de997
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Oct 12 18:15:31 2022 +0800

[INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)
---
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java|  2 +-
 .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java   | 17 -
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index c2014965d..b73223b11 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -498,7 +498,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable, SendMessag
 }
 addStatistics(eventStat, false, 0);
 eventStat.incRetryCnt();
-if (!eventStat.isOrderMessage() && needRetry) {
+if (needRetry) {
 processResendEvent(eventStat);
 }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index efbaedfb6..a0a488fa6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -32,6 +32,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
 import org.apache.inlong.dataproxy.sink.PulsarSink;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,6 +154,12 @@ public class SinkTask extends Thread {
 logger.warn("Event is null!");
 continue;
 }
+// check whether discard or send event
+if (eventStat.getRetryCnt() > maxRetrySendCnt) {
+logger.warn("Message will be discard! send times reach to 
max retry cnt."
++ " topic = {}, max retry cnt = {}", topic, 
maxRetrySendCnt);
+continue;
+}
 // get topic
 topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
 if (StringUtils.isEmpty(topic)) {
@@ -162,21 +169,13 @@ public class SinkTask extends Thread {
 }
 if (topic == null || topic.equals("")) {
 pulsarSink.handleMessageSendException(topic, eventStat,
-new Exception(ConfigConstants.TOPIC_KEY + " info 
is null"));
-processToReTrySend(eventStat);
-logger.warn("no topic specified, so will retry send!");
+new NotFoundException(ConfigConstants.TOPIC_KEY + 
" info is null"));
 continue;
 }
 // check whether order-type message
 if (eventStat.isOrderMessage()) {
 sleep(1000);
 }
-// check whether discard or send event
-if (eventStat.getRetryCnt() > maxRetrySendCnt) {
-logger.warn("Message will be discard! send times reach to 
max retry cnt."
-+ " topic = {}, max retry cnt = {}", topic, 
maxRetrySendCnt);
-continue;
-}
 // check whether duplicated event
 String clientSeqId = 
event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
 if (pulsarConfig.getClientIdCache() && clientSeqId != null) {



[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

2022-10-12 Thread GitBox


healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r993346660


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java:
##
@@ -85,22 +77,12 @@ public WorkflowProcess defineProcess() {
 deleteMQTask.setListenerFactory(groupTaskListenerFactory);
 process.addTask(deleteMQTask);
 
-// Delete Sort

Review Comment:
   When do we need to delete the Sort task?
   
   As far as I know, a Sort task may process data of multiple 
Pairs. To modify any StreamSource or StreamSink, we 
stop the Sort task first, push all updated and non-updated StreamSource or 
StreamSink information to the Sort task, and finally restart it.
   
   In this case, how to maintain the Sort task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] jun0315 commented on pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI

2022-10-12 Thread GitBox


jun0315 commented on PR #5379:
URL: https://github.com/apache/inlong/pull/5379#issuecomment-1276446104

   Sorry, I find that there are too many content to adapt to. I plan to close 
this PR for my personal reasons.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] jun0315 closed pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI

2022-10-12 Thread GitBox


jun0315 closed pull request #5379: [INLONG-5231][Improve][CodeStyle] Add 
spotless to CI
URL: https://github.com/apache/inlong/pull/5379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


haifxu commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r994062527


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##
@@ -75,6 +78,7 @@
 public class AuditServiceImpl implements AuditService {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditServiceImpl.class);
+private static final String SECOND_FORMAT = "yyy-MM-dd HH:mm:ss";

Review Comment:
   Done.



##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##
@@ -155,6 +161,20 @@ public List listByCondition(AuditRequest request) 
throws IOException {
 
auditId.equals(AuditConstants.AUDIT_ID_SORT_OUTPUT) ? sinkNodeType : null));
 }
 }
+} else if (AuditQuerySource.CLICKHOUSE == querySource) {
+Statement ckStatement = clickhouseConfig.getCkStatement();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


haifxu commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r994062643


##
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java:
##
@@ -28,5 +28,9 @@ public enum AuditQuerySource {
 /**
  * ELASTICSEARCH source of query
  */
-ELASTICSEARCH;
+ELASTICSEARCH,
+/**
+ * ELASTICSEARCH source of query

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


haifxu commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r994062943


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickhouseConfig.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.ck;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+/**
+ * Clickhouse config information, including url, user, etc.
+ */
+@Component
+public class ClickhouseConfig {
+
+@Value("${ck.index.search.url}")
+private String url;
+
+@Value("${ck.auth.user}")
+private String user;
+
+@Value("${ck.auth.password}")
+private String password;
+
+private Statement ckStatement;
+
+/**
+ * Get ClickHouse statement from config file
+ */
+public Statement getCkStatement() throws Exception {
+if (ckStatement == null) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


haifxu commented on code in PR #6155:
URL: https://github.com/apache/inlong/pull/6155#discussion_r994062764


##
inlong-manager/manager-web/src/main/resources/application-test.properties:
##
@@ -63,3 +63,11 @@ es.auth.enable=false
 es.auth.user=admin
 # Elasticsearch password of authentication info
 es.auth.password=inlong
+
+# ClickHouse config
+# ClickHouse url
+ck.index.search.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] lucaspeng12138 opened a new pull request, #6161: [INLONG-6159][Manager] Fix heartbeat status update

2022-10-12 Thread GitBox


lucaspeng12138 opened a new pull request, #6161:
URL: https://github.com/apache/inlong/pull/6161

   ### Prepare a Pull Request
   
   Fix heartbeat status update
   - Fixes #6159 
   
   ### Motivation
   
   When heartbeat received, heartbeat manager init thread and update thread may 
get same version in inlong_cluster_node table. Then two threads will use same 
version to update this table. One of them will failed. As update operation in 
reportHeartbeat function will be skipped if last heartbeat with same type 
exists cache. So heartbeat status may be timeout still cache dropped.
   
   ### Modifications
   
   Add return judgment. 
   When MySql return is larger than zero, it means at least one field is 
updated. So when return is zero, skip add this heartbeat into cache. To avoid 
next heartbeat received, because last heartbeat cached, cause skipping update 
MySql.
   
   ### Verifying this change
   
   Add return judgment. 
   When MySql return is larger than zero, it means at least one field is 
updated. So when return is zero, skip add this heartbeat into cache. To avoid 
next heartbeat received, because last heartbeat cached, cause skipping update 
MySql.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse

2022-10-12 Thread GitBox


dockerzhang commented on PR #6155:
URL: https://github.com/apache/inlong/pull/6155#issuecomment-1276990180

   @haifxu please update the document at the same time, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #6162: [INLONG-6158][TubeMQ] Add ZooKeeper request timeout configuration

2022-10-12 Thread GitBox


gosonzhang opened a new pull request, #6162:
URL: https://github.com/apache/inlong/pull/6162

   
   - Fixes #6158 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng opened a new pull request, #6164: [INLONG-6163][Dashboard] Form will lose old data when data changes are merged

2022-10-12 Thread GitBox


leezng opened a new pull request, #6164:
URL: https://github.com/apache/inlong/pull/6164

   - Fixes #6163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu opened a new pull request, #6165: [INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode

2022-10-12 Thread GitBox


yunqingmoswu opened a new pull request, #6165:
URL: https://github.com/apache/inlong/pull/6165

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   Title: [INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   Fixes #6160 
   
   ### Motivation
   
   Support dynamic partition for KafkaLoadNode when the format of kafka is raw 
and the 'key.fields' is not specifyed.
   This is mainly for some whole database migration scenarios, we assume that 
the upstream input data is a mixed schema of whole database migration, we 
ignore the real schema for now, receive the entire record in a binary raw data 
format, and fetch and parse its schema and data on the kafka sink side, and 
according to Some data values ​​are dynamically written to related topic 
partitions.
   Dynamic partition writing has some limitations:
   1.The upstream data is raw format with a fixed inner format, only support 
[canal-json|debezium-json] at now
   2.The 'key.fields' is not specifyed
   3.It needs to specify 'sink.multiple.partition-pattern' and 
'sink.multiple.format' for dynamically extracting partition from data
   4.It will extract primary key from raw data as the partition key used hash 
if the 'sink.multiple.partition-pattern'
equals 'PRIMARY_KEY' else it will parse partition key from raw data.
   
   ### Modifications
   
   1.Add option of 'sink.multiple.partition-pattern'
   2.Add partitioner of 'RawDataHashPartitioner'
   3.Add method extractValues and extractPrimaryKeyNames and 
extractPrimaryKeyValues for the mode of 'DynamicSchemaFormat'
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org