[GitHub] [inlong] healchow merged pull request #6055: [INLONG-5978][Manager] Add protocol type in cluster nodes
healchow merged PR #6055: URL: https://github.com/apache/inlong/pull/6055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new aa91f5984 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055) aa91f5984 is described below commit aa91f5984cf0cd230cba9098834b028a554fc65c Author: xuesongxs <54351417+xueson...@users.noreply.github.com> AuthorDate: Wed Oct 12 15:43:49 2022 +0800 [INLONG-5978][Manager] Add protocol type in cluster nodes (#6055) * Add protocol type constant class * Fix unit test error * Update NodeEditModal.tsx Co-authored-by: healchow Co-authored-by: Daniel --- .../common/heartbeat/ComponentHeartbeat.java | 5 +- .../inlong/common/heartbeat/HeartbeatMsg.java | 7 ++- .../common/pojo/dataproxy/DataProxyNodeInfo.java | 4 ++ inlong-dashboard/src/locales/cn.json | 2 + inlong-dashboard/src/locales/en.json | 2 + .../src/pages/Clusters/NodeEditModal.tsx | 19 inlong-dashboard/src/pages/Clusters/NodeManage.tsx | 4 ++ .../inlong/manager/common/consts/ProtocolType.java | 26 +++--- .../dao/entity/InlongClusterNodeEntity.java| 1 + .../dao/mapper/InlongClusterNodeEntityMapper.java | 3 +- .../mappers/InlongClusterNodeEntityMapper.xml | 57 -- .../manager/pojo/cluster/ClusterNodeRequest.java | 4 ++ .../manager/pojo/cluster/ClusterNodeResponse.java | 3 ++ .../service/cluster/InlongClusterService.java | 5 +- .../service/cluster/InlongClusterServiceImpl.java | 30 +--- .../service/core/heartbeat/HeartbeatManager.java | 2 + .../service/cluster/InlongClusterServiceTest.java | 46 +++-- .../core/heartbeat/HeartbeatManagerTest.java | 3 ++ .../service/core/impl/HeartbeatServiceTest.java| 2 + .../main/resources/h2/apache_inlong_manager.sql| 31 ++-- .../manager-web/sql/apache_inlong_manager.sql | 31 ++-- .../controller/openapi/DataProxyController.java| 6 ++- 22 files changed, 181 insertions(+), 112 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java index 260e4618c..8d6beda08 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java @@ -35,18 +35,21 @@ public class ComponentHeartbeat { private int port; +private String protocolType; + private String inCharges; public ComponentHeartbeat() { } public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, int port, -String inCharges) { +String inCharges, String protocolType) { this.clusterTag = clusterTag; this.clusterName = clusterName; this.componentType = componentType; this.ip = ip; this.port = port; +this.protocolType = protocolType; this.inCharges = inCharges; } } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java index 5277eaf22..42038d993 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java @@ -43,6 +43,11 @@ public class HeartbeatMsg { */ private int port; +/** + * ProtocolType of component + */ +private String protocolType; + /** * Type of component */ @@ -79,6 +84,6 @@ public class HeartbeatMsg { private List streamHeartbeats; public ComponentHeartbeat componentHeartbeat() { -return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges); +return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType); } } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java index 6425348a8..9aca22240 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyNodeInfo.java @@ -40,4 +40,8 @@ public class DataProxyNodeInfo { */ private Integer port; +/** + * Node protocol type + */ +private String protocolType; } diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json index 16e5161f7..feeb8b4d6 100644 --- a/inlong-da
[GitHub] [inlong] leezng merged pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field
leezng merged PR #6135: URL: https://github.com/apache/inlong/pull/6135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field (#6135)
This is an automated email from the ASF dual-hosted git repository. leezng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 435eb7147 [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field (#6135) 435eb7147 is described below commit 435eb71475662823b9f7b6e5ca500bbc13c97923 Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Wed Oct 12 15:54:11 2022 +0800 [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field (#6135) --- inlong-dashboard/src/metas/sinks/greenplum.tsx | 84 +++- inlong-dashboard/src/metas/sinks/oracle.tsx | 75 + inlong-dashboard/src/metas/sinks/postgreSql.tsx | 83 +++- inlong-dashboard/src/metas/sinks/sqlServer.tsx | 86 - 4 files changed, 209 insertions(+), 119 deletions(-) diff --git a/inlong-dashboard/src/metas/sinks/greenplum.tsx b/inlong-dashboard/src/metas/sinks/greenplum.tsx index aff09821c..8af638901 100644 --- a/inlong-dashboard/src/metas/sinks/greenplum.tsx +++ b/inlong-dashboard/src/metas/sinks/greenplum.tsx @@ -20,35 +20,40 @@ import type { FieldItemType } from '@/metas/common'; import EditableTable from '@/components/EditableTable'; import { sourceFields } from './common/sourceFields'; -const greenplumFieldTypes = [ - 'SMALLINT', - 'INT2', - 'SMALLSERIAL', - 'SERIAL', - 'SERIAL2', - 'INTEGER', - 'BIGINT', - 'BIGSERIAL', - 'REAL', - 'FLOAT4', - 'FLOAT8', - 'DOUBLE', - 'NUMERIC', - 'DECIMAL', - 'BOOLEAN', - 'DATE', - 'TIME', - 'TIMESTAMP', - 'CHAR', - 'CHARACTER', - 'VARCHAR', - 'TEXT', - 'BYTEA', - // 'interval', -].map(item => ({ - label: item, - value: item, -})); +const fieldTypesConf = { + SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1 <= M <= 6'), + INT2: () => '', + SMALLSERIAL: (m, d) => (1 <= m && m <= 6 ? '' : '1 <= M <= 6'), + SERIAL: (m, d) => (1 <= m && m <= 11 ? '' : '1 <= M <= 11'), + SERIAL2: () => '', + INTEGER: (m, d) => (1 <= m && m <= 11 ? '' : '1 <= M <= 11'), + BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1 <= M <= 20'), + BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1 <= M <= 20'), + REAL: () => '', + FLOAT4: () => '', + FLOAT8: () => '', + DOUBLE: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 38, 0 <= D < M'), + NUMERIC: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 38, 0 <= D < M'), + DECIMAL: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1 <= M <= 38, 0 <= D < M'), + BOOLEAN: () => '', + DATE: () => '', + TIME: () => '', + TIMESTAMP: () => '', + CHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'), + CHARACTER: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'), + VARCHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1 <= M <= 255'), + TEXT: () => '', + BYTEA: () => '', +}; + +const greenplumFieldTypes = Object.keys(fieldTypesConf).reduce( + (acc, key) => +acc.concat({ + label: key, + value: key, +}), + [], +); export const greenplum: FieldItemType[] = [ { @@ -154,12 +159,29 @@ const getFieldListColumns = sinkValues => { title: `GREENPLUM${i18n.t('meta.Sinks.Greenplum.FieldType')}`, dataIndex: 'fieldType', initialValue: greenplumFieldTypes[0].value, - type: 'select', + type: 'autocomplete', props: (text, record, idx, isNew) => ({ options: greenplumFieldTypes, disabled: [110, 130].includes(sinkValues?.status as number) && !isNew, }), - rules: [{ required: true }], + rules: [ +{ required: true }, +() => ({ + validator(_, val) { +if (val) { + const [, type = val, typeLength = ''] = val.match(/^(.+)\((.+)\)$/) || []; + if (fieldTypesConf.hasOwnProperty(type)) { +const [m = -1, d = -1] = typeLength.split(','); +const errMsg = fieldTypesConf[type]?.(m, d); +if (typeLength && errMsg) return Promise.reject(new Error(errMsg)); + } else { +return Promise.reject(new Error('FieldType error')); + } +} +return Promise.resolve(); + }, +}), + ], }, { title: i18n.t('meta.Sinks.Greenplum.IsMetaField'), diff --git a/inlong-dashboard/src/metas/sinks/oracle.tsx b/inlong-dashboard/src/metas/sinks/oracle.tsx index 05e9ec4ce..495ac91c8 100644 --- a/inlong-dashboard/src/metas/sinks/oracle.tsx +++ b/inlong-dashboard/src/metas/sinks/oracle.tsx @@ -20,30 +20,36 @@ import type { FieldItemType } from '@/metas/common'; import EditableTable from '@/components/EditableTable'; import { sourceFields } from './common/sourceFields'; -const oracleFieldTypes = [ - 'BINARY_FLOAT', - 'BINARY_DOUBLE', - 'SMALLINT', - 'FLOAT', - 'FLOAT4', - 'FLO
[GitHub] [inlong] haifxu opened a new pull request, #6155: [INLONG][Manager] Support to get audit data from clickhouse
haifxu opened a new pull request, #6155: URL: https://github.com/apache/inlong/pull/6155 ### Prepare a Pull Request - Fixes #6154 ### Motivation Support to get audit data from clickhouse. ### Modifications 1.Add clickhouse config information. 2.Support to get audit data from clickhouse. ### Verifying this change  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang commented on a diff in pull request #6155: [INLONG][Manager] Support to get audit data from clickhouse
luchunliang commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r993131615 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java: ## @@ -75,6 +78,7 @@ public class AuditServiceImpl implements AuditService { private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class); +private static final String SECOND_FORMAT = "yyy-MM-dd HH:mm:ss"; Review Comment: miss 'y' yyy-MM-dd HH:mm:ss -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5802: [INLONG-5495][SDK] Support multi-topic manager
dockerzhang merged PR #5802: URL: https://github.com/apache/inlong/pull/5802 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5495][SDK] Support multi-topic manager (#5802)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager (#5802) 08dfa0ecc is described below commit 08dfa0ecc48944f6fc45197827c177e9eb07d528 Author: vernedeng AuthorDate: Wed Oct 12 16:10:58 2022 +0800 [INLONG-5495][SDK] Support multi-topic manager (#5802) --- .../sdk/sort/api/InlongTopicManagerFactory.java| 3 +- .../sort/fetcher/kafka/AckOffsetOnRebalance.java | 10 +- .../fetcher/kafka/KafkaSingleTopicFetcher.java | 2 +- .../sort/impl/kafka/InLongKafkaFetcherImpl.java| 2 +- .../sdk/sort/manager/InlongMultiTopicManager.java | 289 + 5 files changed, 300 insertions(+), 6 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java index a8219f95f..834315236 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.sort.api; import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType; +import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager; import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager; /** @@ -47,6 +48,6 @@ public class InlongTopicManagerFactory { public static TopicManager createMultiTopicManager( ClientContext context, QueryConsumeConfig queryConsumeConfig) { -return null; +return new InlongMultiTopicManager(context, queryConsumeConfig); } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java index 0cf27bc82..d0a66c6e3 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java @@ -51,9 +51,13 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener { private final AtomicLong revokedNum = new AtomicLong(0); private final AtomicLong assignedNum = new AtomicLong(0); -public AckOffsetOnRebalance(String clusterId, Seeker seeker, -ConcurrentHashMap commitOffsetMap) { -this(clusterId, seeker, commitOffsetMap, null, null); +public AckOffsetOnRebalance( +String clusterId, +Seeker seeker, +ConcurrentHashMap commitOffsetMap, +KafkaConsumer consumer) { +this(clusterId, seeker, commitOffsetMap, null, consumer); } public AckOffsetOnRebalance( diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java index 9ed97261a..3f903720b 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java @@ -79,7 +79,7 @@ public class KafkaSingleTopicFetcher extends SingleTopicFetcher { this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); consumer.subscribe(Collections.singletonList(topic.getTopic()), new AckOffsetOnRebalance(this.topic.getInLongCluster().getClusterId(), seeker, -commitOffsetMap)); +commitOffsetMap, consumer)); } else { LOGGER.info("consumer is null"); return false; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java index d2da76c0f..2dd9c2cf1 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java @@ -77,7 +77,7 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher { this.seeker = SeekerFactory.createKafkaSeeker(consumer, inLongTopic); consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()), new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), se
[GitHub] [inlong] luchunliang commented on a diff in pull request #6155: [INLONG][Manager] Support to get audit data from clickhouse
luchunliang commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r993133586 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java: ## @@ -211,6 +231,31 @@ private SearchRequest toAuditSearchRequest(String index, String groupId, String return new SearchRequest(new String[]{index}, sourceBuilder); } +/** + * Convert to clickhouse search sql + * + * @param groupId The groupId of inlong + * @param streamId The streamId of inlong + * @param auditId The auditId of request + * @param dt The datetime of request + * @return clickhouse sql + */ +private String toAuditCkSql(String groupId, String streamId, String auditId, String dt) { +DateTimeFormatter formatter = DateTimeFormat.forPattern(DAY_FORMAT); +DateTime date = formatter.parseDateTime(dt); +String sDate = date.toString(SECOND_FORMAT); +String eDate = date.plusDays(1).toString(SECOND_FORMAT); +return new SQL() +.SELECT("log_ts", "sum(count) as total") +.FROM("apache_inlong_audit.audit_data") Review Comment: It is not good that code use fixed database name. ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java: ## @@ -155,6 +161,20 @@ public List listByCondition(AuditRequest request) throws IOException { auditId.equals(AuditConstants.AUDIT_ID_SORT_OUTPUT) ? sinkNodeType : null)); } } +} else if (AuditQuerySource.CLICKHOUSE == querySource) { +Statement ckStatement = clickhouseConfig.getCkStatement(); Review Comment: miss Statement.close(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #6157: [INLONG-6156][DataProxy] Twice event write when topic is empty
gosonzhang opened a new pull request, #6157: URL: https://github.com/apache/inlong/pull/6157 - Fixes #6156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
healchow commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r993164303 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickhouseConfig.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.ck; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.sql.Connection; +import java.sql.Statement; + +/** + * Clickhouse config information, including url, user, etc. + */ +@Component +public class ClickhouseConfig { + +@Value("${ck.index.search.url}") +private String url; + +@Value("${ck.auth.user}") +private String user; + +@Value("${ck.auth.password}") +private String password; + +private Statement ckStatement; + +/** + * Get ClickHouse statement from config file + */ +public Statement getCkStatement() throws Exception { +if (ckStatement == null) { Review Comment: 1. This usage seems not thread-safe. 2. Do we really need a long-time connection for the JDBC statement? ## inlong-manager/manager-web/src/main/resources/application-test.properties: ## @@ -63,3 +63,11 @@ es.auth.enable=false es.auth.user=admin # Elasticsearch password of authentication info es.auth.password=inlong + +# ClickHouse config +# ClickHouse url +ck.index.search.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit Review Comment: Suggested changing to `audit.ck.jdbcUrl`, `audit.ck.username`, `audit.ck.password`. ## inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java: ## @@ -28,5 +28,9 @@ public enum AuditQuerySource { /** * ELASTICSEARCH source of query */ -ELASTICSEARCH; +ELASTICSEARCH, +/** + * ELASTICSEARCH source of query Review Comment: Not `ELASTICSEARCH`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (08dfa0ecc -> 53f0038fc)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager (#5802) add 53f0038fc [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/dataproxy/sink/PulsarSink.java| 2 +- .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java | 17 - 2 files changed, 9 insertions(+), 10 deletions(-)
[GitHub] [inlong] gosonzhang merged pull request #6157: [INLONG-6156][DataProxy] Twice event write when topic is empty
gosonzhang merged PR #6157: URL: https://github.com/apache/inlong/pull/6157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch release-1.3.0 updated: [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157) 5cf0b4311 is described below commit 5cf0b4311bade4e203f8c2153d724376144de997 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Oct 12 18:15:31 2022 +0800 [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157) --- .../org/apache/inlong/dataproxy/sink/PulsarSink.java| 2 +- .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java | 17 - 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index c2014965d..b73223b11 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -498,7 +498,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag } addStatistics(eventStat, false, 0); eventStat.incRetryCnt(); -if (!eventStat.isOrderMessage() && needRetry) { +if (needRetry) { processResendEvent(eventStat); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java index efbaedfb6..a0a488fa6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java @@ -32,6 +32,7 @@ import org.apache.inlong.dataproxy.sink.EventStat; import org.apache.inlong.dataproxy.sink.PulsarSink; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +154,12 @@ public class SinkTask extends Thread { logger.warn("Event is null!"); continue; } +// check whether discard or send event +if (eventStat.getRetryCnt() > maxRetrySendCnt) { +logger.warn("Message will be discard! send times reach to max retry cnt." ++ " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt); +continue; +} // get topic topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); if (StringUtils.isEmpty(topic)) { @@ -162,21 +169,13 @@ public class SinkTask extends Thread { } if (topic == null || topic.equals("")) { pulsarSink.handleMessageSendException(topic, eventStat, -new Exception(ConfigConstants.TOPIC_KEY + " info is null")); -processToReTrySend(eventStat); -logger.warn("no topic specified, so will retry send!"); +new NotFoundException(ConfigConstants.TOPIC_KEY + " info is null")); continue; } // check whether order-type message if (eventStat.isOrderMessage()) { sleep(1000); } -// check whether discard or send event -if (eventStat.getRetryCnt() > maxRetrySendCnt) { -logger.warn("Message will be discard! send times reach to max retry cnt." -+ " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt); -continue; -} // check whether duplicated event String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID); if (pulsarConfig.getClientIdCache() && clientSeqId != null) {
[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes
healchow commented on code in PR #6046: URL: https://github.com/apache/inlong/pull/6046#discussion_r993346660 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java: ## @@ -85,22 +77,12 @@ public WorkflowProcess defineProcess() { deleteMQTask.setListenerFactory(groupTaskListenerFactory); process.addTask(deleteMQTask); -// Delete Sort Review Comment: When do we need to delete the Sort task? As far as I know, a Sort task may process data of multiple Pairs. To modify any StreamSource or StreamSink, we stop the Sort task first, push all updated and non-updated StreamSource or StreamSink information to the Sort task, and finally restart it. In this case, how to maintain the Sort task? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] jun0315 commented on pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI
jun0315 commented on PR #5379: URL: https://github.com/apache/inlong/pull/5379#issuecomment-1276446104 Sorry, I find that there are too many content to adapt to. I plan to close this PR for my personal reasons. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] jun0315 closed pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI
jun0315 closed pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI URL: https://github.com/apache/inlong/pull/5379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
haifxu commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r994062527 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java: ## @@ -75,6 +78,7 @@ public class AuditServiceImpl implements AuditService { private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class); +private static final String SECOND_FORMAT = "yyy-MM-dd HH:mm:ss"; Review Comment: Done. ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java: ## @@ -155,6 +161,20 @@ public List listByCondition(AuditRequest request) throws IOException { auditId.equals(AuditConstants.AUDIT_ID_SORT_OUTPUT) ? sinkNodeType : null)); } } +} else if (AuditQuerySource.CLICKHOUSE == querySource) { +Statement ckStatement = clickhouseConfig.getCkStatement(); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
haifxu commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r994062643 ## inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/AuditQuerySource.java: ## @@ -28,5 +28,9 @@ public enum AuditQuerySource { /** * ELASTICSEARCH source of query */ -ELASTICSEARCH; +ELASTICSEARCH, +/** + * ELASTICSEARCH source of query Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
haifxu commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r994062943 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickhouseConfig.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.ck; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.sql.Connection; +import java.sql.Statement; + +/** + * Clickhouse config information, including url, user, etc. + */ +@Component +public class ClickhouseConfig { + +@Value("${ck.index.search.url}") +private String url; + +@Value("${ck.auth.user}") +private String user; + +@Value("${ck.auth.password}") +private String password; + +private Statement ckStatement; + +/** + * Get ClickHouse statement from config file + */ +public Statement getCkStatement() throws Exception { +if (ckStatement == null) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu commented on a diff in pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
haifxu commented on code in PR #6155: URL: https://github.com/apache/inlong/pull/6155#discussion_r994062764 ## inlong-manager/manager-web/src/main/resources/application-test.properties: ## @@ -63,3 +63,11 @@ es.auth.enable=false es.auth.user=admin # Elasticsearch password of authentication info es.auth.password=inlong + +# ClickHouse config +# ClickHouse url +ck.index.search.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lucaspeng12138 opened a new pull request, #6161: [INLONG-6159][Manager] Fix heartbeat status update
lucaspeng12138 opened a new pull request, #6161: URL: https://github.com/apache/inlong/pull/6161 ### Prepare a Pull Request Fix heartbeat status update - Fixes #6159 ### Motivation When heartbeat received, heartbeat manager init thread and update thread may get same version in inlong_cluster_node table. Then two threads will use same version to update this table. One of them will failed. As update operation in reportHeartbeat function will be skipped if last heartbeat with same type exists cache. So heartbeat status may be timeout still cache dropped. ### Modifications Add return judgment. When MySql return is larger than zero, it means at least one field is updated. So when return is zero, skip add this heartbeat into cache. To avoid next heartbeat received, because last heartbeat cached, cause skipping update MySql. ### Verifying this change Add return judgment. When MySql return is larger than zero, it means at least one field is updated. So when return is zero, skip add this heartbeat into cache. To avoid next heartbeat received, because last heartbeat cached, cause skipping update MySql. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on pull request #6155: [INLONG-6154][Manager] Support to get audit data from the ClickHouse
dockerzhang commented on PR #6155: URL: https://github.com/apache/inlong/pull/6155#issuecomment-1276990180 @haifxu please update the document at the same time, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #6162: [INLONG-6158][TubeMQ] Add ZooKeeper request timeout configuration
gosonzhang opened a new pull request, #6162: URL: https://github.com/apache/inlong/pull/6162 - Fixes #6158 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leezng opened a new pull request, #6164: [INLONG-6163][Dashboard] Form will lose old data when data changes are merged
leezng opened a new pull request, #6164: URL: https://github.com/apache/inlong/pull/6164 - Fixes #6163 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu opened a new pull request, #6165: [INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode
yunqingmoswu opened a new pull request, #6165: URL: https://github.com/apache/inlong/pull/6165 ### Prepare a Pull Request *(Change the title refer to the following example)* Title: [INLONG-6160][Sort] Support dynamic partition for KafkaLoadNode *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* Fixes #6160 ### Motivation Support dynamic partition for KafkaLoadNode when the format of kafka is raw and the 'key.fields' is not specifyed. This is mainly for some whole database migration scenarios, we assume that the upstream input data is a mixed schema of whole database migration, we ignore the real schema for now, receive the entire record in a binary raw data format, and fetch and parse its schema and data on the kafka sink side, and according to Some data values are dynamically written to related topic partitions. Dynamic partition writing has some limitations: 1.The upstream data is raw format with a fixed inner format, only support [canal-json|debezium-json] at now 2.The 'key.fields' is not specifyed 3.It needs to specify 'sink.multiple.partition-pattern' and 'sink.multiple.format' for dynamically extracting partition from data 4.It will extract primary key from raw data as the partition key used hash if the 'sink.multiple.partition-pattern' equals 'PRIMARY_KEY' else it will parse partition key from raw data. ### Modifications 1.Add option of 'sink.multiple.partition-pattern' 2.Add partitioner of 'RawDataHashPartitioner' 3.Add method extractValues and extractPrimaryKeyNames and extractPrimaryKeyValues for the mode of 'DynamicSchemaFormat' ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org