Re: [PR] [INLONG-9550][Dashboard] System operation and maintenance supports query and audit through Gorupid and Stream id [inlong]
leezng commented on PR #9551: URL: https://github.com/apache/inlong/pull/9551#issuecomment-1880672040 How about using tabs instead? These two actually belong to the same module. I don’t think they need to be separated by menus. Or in the same table, you can add a filter: dimension (IP or ID) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields [inlong]
EMsnap merged PR #9492: URL: https://github.com/apache/inlong/pull/9492 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields (#9492)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f3963ba3ee [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields (#9492) f3963ba3ee is described below commit f3963ba3eeeb2ad94782ea2f3a96ec0f7c9e4be2 Author: vernedeng AuthorDate: Tue Jan 9 10:11:01 2024 +0800 [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields (#9492) --- .../sort-formats/format-inlongmsg-base/pom.xml | 2 +- .../formats/inlongmsg/InLongMsgDecodingFormat.java | 64 ++- .../formats/inlongmsg/InLongMsgFormatFactory.java | 12 +- .../sort/formats/inlongmsg/InLongMsgOptions.java | 26 + .../inlongmsg/InLongMsgRowDataSerDeTest.java | 123 + 5 files changed, 220 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml index 8a3f720721..8846aa0446 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml +++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml @@ -94,7 +94,7 @@ org.apache.flink flink-csv -test +provided diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java index 0f67bbc072..93e970aa85 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java @@ -19,8 +19,17 @@ package org.apache.inlong.sort.formats.inlongmsg; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter; +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; @@ -31,6 +40,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +50,12 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS; + +@Slf4j public class InLongMsgDecodingFormat implements DecodingFormat> { private final String innerFormatMetaPrefix; @@ -50,14 +66,23 @@ public class InLongMsgDecodingFormat implements DecodingFormat> innerDecodingFormat, String innerFormatMetaPrefix, -boolean ignoreErrors) { +ReadableConfig formatOptions) { this.innerDecodingFormat = innerDecodingFormat; this.innerFormatMetaPrefix = innerFormatMetaPrefix; this.metadataKeys = Collections.emptyList(); -this.ignoreErrors = ignoreErrors; +this.ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS); +this.ignoreTrailingUnmappable = formatOptions.get(CSV_IGNORE_TRAILING_UNMAPPABLE); +this.insertNullsForMissingColumns = formatOptions.get(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS); +this.emptyStringAsNull = formatOptions.get(CSV_EMPTY_STRING_AS_NULL); } @Override @@ -83,8 +108,15 @@ public class InLongMsgDecodingFormat implements DecodingFormat producedTypeInfo = context
Re: [PR] [INLONG-9550][Dashboard] System operation and maintenance supports query and audit through Gorupid and Stream id [inlong]
bluewang commented on PR #9551: URL: https://github.com/apache/inlong/pull/9551#issuecomment-1882335303 > How about using tabs instead? These two actually belong to the same module. I don’t think they need to be separated by menus. > > Or in the same table, you can add a filter: dimension (IP or ID) ok, I have made changes based on your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [INLONG-9552][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure [inlong]
fuweng11 opened a new pull request, #9553: URL: https://github.com/apache/inlong/pull/9553 ### Prepare a Pull Request - Fixes #9552 ### Motivation 1.Fix the problem of sink remains in configuration after standalone cluster allocation failure. 2.Fix the problem of serializing information in extparams when dealing with serializing field information. ### Modifications 1.Fix the problem of sink remains in configuration after standalone cluster allocation failure. 2.Fix the problem of serializing information in extparams when dealing with serializing field information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [INLONG-9552][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure [inlong]
dockerzhang merged PR #9553: URL: https://github.com/apache/inlong/pull/9553 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated: [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 6b9da03b43 [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553) 6b9da03b43 is described below commit 6b9da03b43f83db56cefe28b679aaa04d6a755cf Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jan 9 14:06:43 2024 +0800 [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553) --- .../main/resources/mappers/AuditEntityMapper.xml | 2 + .../AbstractStandaloneSinkResourceOperator.java| 46 +- .../service/sink/ck/ClickHouseSinkOperator.java| 1 + .../service/sink/es/ElasticsearchSinkOperator.java | 1 + .../service/sink/iceberg/IcebergSinkOperator.java | 1 + .../service/sink/kudu/KuduSinkOperator.java| 1 + .../sink/starrocks/StarRocksSinkOperator.java | 1 + 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml index 63d6af6c73..f5cce709e3 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml @@ -41,6 +41,7 @@ + @@ -50,6 +51,7 @@ + diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index 79893ad9c0..75174f120a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.resource.sink; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.SinkStatus; +import org.apache.inlong.manager.common.exceptions.WorkflowException; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; @@ -55,26 +56,33 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso @VisibleForTesting protected void assignCluster(SinkInfo sinkInfo) { -if (StringUtils.isBlank(sinkInfo.getSinkType())) { -throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage()); +try { +if (StringUtils.isBlank(sinkInfo.getSinkType())) { +throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage()); +} + +if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) { +String info = "no need to auto-assign cluster since the cluster has already assigned"; +sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); +return; +} + +String targetCluster = assignOneCluster(sinkInfo); +Preconditions.expectNotBlank(targetCluster, +String.format("find no proper cluster assign to group=%s, stream=%s, sink type=%s, data node=%s ", +sinkInfo.getInlongGroupId(), sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(), +sinkInfo.getDataNodeName())); + +StreamSinkEntity sink = sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId()); +sink.setInlongClusterName(targetCluster); +sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode()); +sinkEntityMapper.updateByIdSelective(sink); +} catch (Throwable e) { +String errMsg = "assign standalone cluster failed: " + e.getMessage(); +log.error(errMsg, e); +sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); +throw new WorkflowException(errMsg); } - -if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) { -String info = "no need to auto-assign cluster since the cluster has already assigned"; -sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); -
[PR] [INLONG-9554][SDK] Fix the shutdownInternalThreads methods throw the NullException [inlong]
castorqin opened a new pull request, #9555: URL: https://github.com/apache/inlong/pull/9555 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #9554 ### Motivation Fix the shutdownInternalThreads methods throw the NullException ### Modifications The details can be found at [#9554 ](https://github.com/apache/inlong/issues/9554). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(inlong) branch master updated (6b9da03b43 -> e91719bfcc)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 6b9da03b43 [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553) add e91719bfcc [INLONG-9550][Dashboard] System operation and maintenance supports query and audit through Gorupid and Stream id (#9551) No new revisions were added by this update. Summary of changes: inlong-dashboard/src/configs/routes/conf.ts| 7 +- inlong-dashboard/src/ui/locales/cn.json| 10 +- inlong-dashboard/src/ui/locales/en.json| 10 +- .../IdModule}/config.tsx | 117 +++-- .../IdModule}/index.tsx| 60 +-- .../IpModule}/config.tsx | 57 +++--- .../IpModule}/index.tsx| 48 - .../src/ui/pages/ModuleAudit/index.tsx | 81 ++ 8 files changed, 226 insertions(+), 164 deletions(-) copy inlong-dashboard/src/ui/pages/{ModuleAuditDashboard => ModuleAudit/IdModule}/config.tsx (64%) copy inlong-dashboard/src/ui/pages/{ModuleAuditDashboard => ModuleAudit/IdModule}/index.tsx (68%) rename inlong-dashboard/src/ui/pages/{ModuleAuditDashboard => ModuleAudit/IpModule}/config.tsx (71%) rename inlong-dashboard/src/ui/pages/{ModuleAuditDashboard => ModuleAudit/IpModule}/index.tsx (75%) create mode 100644 inlong-dashboard/src/ui/pages/ModuleAudit/index.tsx
Re: [PR] [INLONG-9550][Dashboard] System operation and maintenance supports query and audit through Gorupid and Stream id [inlong]
dockerzhang merged PR #9551: URL: https://github.com/apache/inlong/pull/9551 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org