[PR] [INLONG-11481][Sort] Tube Connector source supports dirty data achieving [inlong]
vernedeng opened a new pull request, #11482: URL: https://github.com/apache/inlong/pull/11482 Fixes #11481 ### Motivation ### Modifications ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-11455][Sort] Only one OpenTelemetryAppender should be registered [inlong]
qy-liuhuo commented on PR #11480: URL: https://github.com/apache/inlong/pull/11480#issuecomment-2468119009 It seems that this failed unit test was not caused by this pr,can you restart it for me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-11481][Sort] Tube Connector source supports dirty data achieving (#11482)
This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ab59b91be2 [INLONG-11481][Sort] Tube Connector source supports dirty data achieving (#11482) ab59b91be2 is described below commit ab59b91be2a2d84b0c0155d84a1da6bad204cc4d Author: vernedeng AuthorDate: Mon Nov 11 18:40:56 2024 +0800 [INLONG-11481][Sort] Tube Connector source supports dirty data achieving (#11482) --- .../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 27 +++-- ...SdkDirtySink.java => InlongSdkDirtySender.java} | 19 +++- inlong-sort/sort-flink/base/pom.xml| 5 + .../org/apache/inlong/sort/base/Constants.java | 2 +- .../apache/inlong/sort/base/dirty/DirtyData.java | 40 ++- ...gSdkOptions.java => InlongSdkDirtyOptions.java} | 7 +- .../base/dirty/sink/sdk/InlongSdkDirtySink.java| 115 - .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java | 39 +++ .../sort-connectors/tubemq/pom.xml | 9 ++ .../table/DynamicTubeMQDeserializationSchema.java | 2 +- .../DynamicTubeMQTableDeserializationSchema.java | 61 ++- .../tubemq/table/TubeMQDynamicTableFactory.java| 37 --- .../sort/tubemq/table/TubeMQTableSource.java | 15 ++- 13 files changed, 243 insertions(+), 135 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java index 984c456480..977e002478 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -18,12 +18,17 @@ package org.apache.inlong.sdk.dirtydata; import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.text.StringEscapeUtils; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.StringJoiner; +@Slf4j @Builder public class DirtyMessageWrapper { @@ -32,16 +37,17 @@ public class DirtyMessageWrapper { private String inlongGroupId; private String inlongStreamId; -private String dataTime; +private long dataTime; private String dataflowId; private String serverType; private String dirtyType; +private String dirtyMessage; private String ext; private String data; private byte[] dataBytes; public String format() { -String now = LocalDateTime.now().format(dateTimeFormatter); +String reportTime = LocalDateTime.now().format(dateTimeFormatter); StringJoiner joiner = new StringJoiner(delimiter); String formatData = null; if (data != null) { @@ -50,14 +56,19 @@ public class DirtyMessageWrapper { formatData = Base64.getEncoder().encodeToString(dataBytes); } -return joiner.add(inlongGroupId) -.add(inlongStreamId) -.add(now) -.add(dataTime) +String dataTimeStr = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), +ZoneId.systemDefault()).format(dateTimeFormatter); +return joiner .add(dataflowId) +.add(inlongGroupId) +.add(inlongStreamId) +.add(reportTime) +.add(dataTimeStr) .add(serverType) .add(dirtyType) -.add(ext) -.add(formatData).toString(); +.add(StringEscapeUtils.escapeXSI(dirtyMessage)) +.add(StringEscapeUtils.escapeXSI(ext)) +.add(StringEscapeUtils.escapeXSI(formatData)) +.toString(); } } diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java similarity index 86% rename from inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java rename to inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 2240ebdb6c..88e2e88a74 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -18,7 +18,6 @@ package org.apache.inlong.sdk.dirtydata; import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.MessageSender; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.
Re: [PR] [INLONG-11481][Sort] Tube Connector source supports dirty data achieving [inlong]
vernedeng merged PR #11482: URL: https://github.com/apache/inlong/pull/11482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-11400][Manager] Support Airflow schedule engine [inlong]
fuweng11 commented on code in PR #11479: URL: https://github.com/apache/inlong/pull/11479#discussion_r1837566933 ## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java: ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.common.bounded.BoundaryType; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.airflow.Connection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAG; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRunConf; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse; +import org.apache.inlong.manager.schedule.airflow.api.connection.ConnectionCreator; +import org.apache.inlong.manager.schedule.airflow.api.connection.ConnectionGetter; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGCollectionUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGDeletor; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dagruns.DAGRunsTrigger; +import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig; +import org.apache.inlong.manager.schedule.airflow.util.DAGUtil; +import org.apache.inlong.manager.schedule.airflow.util.DateUtil; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import com.google.common.collect.ImmutableMap; +import org.apache.mina.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI; + +/** + * Response for processing the register/unregister/update requests from {@link AirflowScheduleClient} + */ +@Service +public class AirflowScheduleEngine implements ScheduleEngine { + +private static final Logger LOGGER = LoggerFactory.getLogger(AirflowScheduleEngine.class); +private final Set scheduledJobSet = new ConcurrentHashSet<>(); +private AirflowServerClient serverClient; +private AirflowConfig airflowConfig; + +public AirflowScheduleEngine(AirflowServerClient serverClient, AirflowConfig airflowConfig) { +this.serverClient = serverClient; +this.airflowConfig = airflowConfig; +start(); +} + +@Override +public void start() { +try { +// Create authentication information for the Inlong Manger API used by AirFlow +initConnection(); +// Check if DagCleaner and DagCreator exist and unpause them +switchOriginalDAG(false); +// Start all task DAGs and load all DAG ID(Group Id) into the local cache +switchAllTaskDAG(false); +LOGGER.info("Airflow initialization succeeded."); +} catch (Exception e) { +LOGGER.error("Airflow initialization failed : {}", e.toString()); Review Comment: ```suggestion LOGGER.error("Airflow initialization failed : ", e); ``` ## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java: ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License");
Re: [PR] [INLONG-11401][Manager] Support Dolphinscheduler schedule engine [inlong]
fuweng11 commented on code in PR #11468: URL: https://github.com/apache/inlong/pull/11468#discussion_r1837583090 ## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + +private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + +@Value("${server.host:127.0.0.1}") +private String host; + +@Value("${server.port:8083}") +private int port; + +@Value("${default.admin.user:admin}") +private String username; + +@Value("${default.admin.password:inlong}") +private String password; + +@Value("${inlong.schedule.dolphinscheduler.url:}") +private String dolphinUrl; + +@Value("${inlong.schedule.dolphinscheduler.token:}") +private String token; + +private long projectCode; +private final String projectBaseUrl; +private final DolphinScheduleUtils dsUtils; +private final Map scheduledProcessMap; + +public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, +String token) { +this.host = host; +this.port = port; +this.username = username; +this.password = password; +this.dolphinUrl = dolphinUrl; +this.token = token; +this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; +try { +LOGGER.info("Dolphin Scheduler engine http client initialized"); +this.dsUtils = new DolphinScheduleUtils(); +this.scheduledProcessMap = new ConcurrentHashMap<>(); +} catch (Exception e) { +throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); +} +} + +public DolphinScheduleEngine() { +this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; +try { +LOGGER.info("Dolphin Scheduler engine http client initialized"); +this.dsUtils = new DolphinScheduleUtils(); +this.scheduledProcessMap = new ConcurrentHa
[PR] [INLONG-11485][Dashboard] Support multiple scheduling enginessearch [inlong]
wohainilaodou opened a new pull request, #11486: URL: https://github.com/apache/inlong/pull/11486 Fixes #11485 ### Motivation Support multiple scheduling enginessearch ### Modifications Support multiple scheduling enginessearch ### Verifying this change before:  after:     -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-11471][CI] Exclude useless artifact in hadoop-common (#11472)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new cef2dbdc15 [INLONG-11471][CI] Exclude useless artifact in hadoop-common (#11472) cef2dbdc15 is described below commit cef2dbdc1545a7187e67f842411082f6c19102e8 Author: AloysZhang AuthorDate: Tue Nov 12 14:35:16 2024 +0800 [INLONG-11471][CI] Exclude useless artifact in hadoop-common (#11472) --- inlong-sdk/transform-sdk/pom.xml | 24 +++ inlong-sort-standalone/pom.xml | 26 inlong-sort/sort-core/pom.xml | 28 ++ .../sort-flink-v1.13/sort-connectors/hbase/pom.xml | 28 ++ .../sort-flink-v1.13/sort-connectors/jdbc/pom.xml | 28 ++ .../sort-flink-v1.15/sort-connectors/hbase/pom.xml | 28 ++ .../sort-connectors/iceberg/pom.xml| 28 ++ .../sort-flink-v1.15/sort-connectors/jdbc/pom.xml | 28 ++ 8 files changed, 218 insertions(+) diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml index 495fc60334..18db54765e 100644 --- a/inlong-sdk/transform-sdk/pom.xml +++ b/inlong-sdk/transform-sdk/pom.xml @@ -72,6 +72,30 @@ org.apache.hadoop hadoop-common + + +javax.servlet +servlet-api + + +javax.servlet.jsp +jsp-api + + + +jdk.tools +jdk.tools + + +com.github.stephenc.jcip +jcip-annotations + + +net.minidev +json-smart + + org.apache.hadoop diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml index b55ce9ba4b..99cd735f97 100644 --- a/inlong-sort-standalone/pom.xml +++ b/inlong-sort-standalone/pom.xml @@ -174,6 +174,32 @@ com.google.protobuf protobuf-java + +javax.servlet +servlet-api + + +javax.servlet.jsp +jsp-api + + + +jdk.tools +jdk.tools + + +com.github.stephenc.jcip +jcip-annotations + + +net.minidev +json-smart + + +com.fasterxml.woodstox +woodstox-core + diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 0ad94493d3..7d20ed65a8 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -76,6 +76,34 @@ org.apache.hadoop hadoop-common test + + +javax.servlet +servlet-api + + +javax.servlet.jsp +jsp-api + + + +jdk.tools +jdk.tools + + +com.github.stephenc.jcip +jcip-annotations + + +net.minidev +json-smart + + +com.fasterxml.woodstox +woodstox-core + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hbase/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hbase/pom.xml index 7cc096a93b..973d5d93b2 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hbase/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hbase/pom.xml @@ -62,6 +62,34 @@ org.apache.hadoop hadoop-common provided + + +javax.servlet +servlet-api + + +javax.servlet.jsp +jsp-api + + + +jdk.tools +jdk.tools + + +com.github.stephenc.jcip +
Re: [PR] [INLONG-11471][CI] Exclude useless artifact in hadoop-common [inlong]
aloyszhang merged PR #11472: URL: https://github.com/apache/inlong/pull/11472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-11483][Manager] Support multiple scedule engine selection [inlong]
aloyszhang opened a new pull request, #11484: URL: https://github.com/apache/inlong/pull/11484 Fixes #11483 ### Motivation Support multiple sceduling engine selection ### Modifications Support multiple sceduling engine selection ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org