[GitHub] [inlong] dockerzhang merged pull request #7939: [INLONG-7938][Manager] Fix consume list interface does not filter by request
dockerzhang merged PR #7939: URL: https://github.com/apache/inlong/pull/7939 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7938][Manager] Fix consume list interface does not filter by request (#7939)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4eea8d3d9 [INLONG-7938][Manager] Fix consume list interface does not filter by request (#7939) 4eea8d3d9 is described below commit 4eea8d3d9b3569cfc2623956673118c651b0a234 Author: vernedeng AuthorDate: Thu May 4 15:57:56 2023 +0800 [INLONG-7938][Manager] Fix consume list interface does not filter by request (#7939) --- .../manager/client/api/inner/client/InlongConsumeClient.java | 10 ++ .../inlong/manager/client/api/service/InlongConsumeApi.java| 8 +++- .../manager/client/api/inner/InlongConsumeClientTest.java | 2 +- .../inlong/manager/web/controller/InlongConsumeController.java | 4 ++-- 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java index 4021e09f2..9c0b53f28 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java @@ -17,12 +17,10 @@ package org.apache.inlong.manager.client.api.inner.client; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.inlong.manager.client.api.ClientConfiguration; import org.apache.inlong.manager.client.api.service.InlongConsumeApi; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; @@ -33,8 +31,6 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; -import java.util.Map; - /** * Client for {@link InlongConsumeApi}. */ @@ -96,12 +92,10 @@ public class InlongConsumeClient { * @return inlong consume list */ public PageResult list(InlongConsumePageRequest request) { -Map requestMap = JsonUtils.OBJECT_MAPPER.convertValue(request, -new TypeReference>() { -}); +Preconditions.expectNotNull(request, "inlong consume request cannot be null"); Response> response = ClientUtils.executeHttpCall( -inlongConsumeApi.list(requestMap)); +inlongConsumeApi.list(request)); ClientUtils.assertRespSuccess(response); return response.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java index d78dd5a5a..7f560ba27 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import retrofit2.Call; @@ -30,9 +31,6 @@ import retrofit2.http.DELETE; import retrofit2.http.GET; import retrofit2.http.POST; import retrofit2.http.Path; -import retrofit2.http.Query; - -import java.util.Map; public interface InlongConsumeApi { @@ -45,8 +43,8 @@ public interface InlongConsumeApi { @GET("consume/countStatus") Call> countStatusByUser(); -@GET("consume/list") -Call>> list(@Query("request") Map request); +@POST("consume/list") +Call>> list(@Body InlongConsumePageRequest request); @POST("consume/update") Call> update(@Body InlongConsumeRequest request); diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java index f134f2f81..defb3c269 100644 --- a/inlong-manager/manager-
[GitHub] [inlong] xuehuanran commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
xuehuanran commented on code in PR #7933: URL: https://github.com/apache/inlong/pull/7933#discussion_r1185058338 ## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientIDstring + config *config.Config + nextAuth2Master int32 + selectorselector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMusync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { + if err := config.ValidateProducer(); err != nil { + return nil, err + } + log.Infof("The config of the producer is %s", config) + + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newProducerClientID() + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.TLSEnable = true + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + + client := rpc.New(pool, opts, config) + + p := &producer{ + config: config, + clientID:clientID, + selector:selector, + client: client, + unreportedTimes: 0, + brokerMap: make(map[string]*metadata.Node), + publishTopics: config.Producer.Topics, + } + + err = p.register2Master(true) + if err != nil { + return nil, err + } + + log.Infof("[PRODUCER] start producer success, client=%s", clientID) + return p, nil +} + +func (p *producer) register2Master(needChange bool) error { + if needChange { + p.selector.Refresh(p.config.Producer.Masters) + node, err := p.selector.Select(p.config.Producer.Masters) + if err != nil { + return err + } + p.master = node + } + + retryCount := 0 + for { + rsp, err := p.sendRegRequest2Master() + if err != nil || !rsp.GetSuccess() { + if err != nil { + log.Errorf("[PRODUCER]register2Master error %s", err.Error()) + } + if !p.master.HasNext { + if err != nil { + return err + } + if rsp != nil { + log.Errorf("[PRODUCER] register2Master(%s) failure exist register, client=%s, er
[GitHub] [inlong] xuehuanran commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
xuehuanran commented on code in PR #7933: URL: https://github.com/apache/inlong/pull/7933#discussion_r1185062731 ## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer.go: ## @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +type Producer interface { Review Comment: Yes, the functions will be committed in later PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] TszKitLo40 commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK
TszKitLo40 commented on code in PR #7933: URL: https://github.com/apache/inlong/pull/7933#discussion_r1185617475 ## inlong-tubemq/tubemq-client-twins/tubemq-client-go/util/util.go: ## @@ -147,3 +150,17 @@ func Join(m map[string]string, step1 string, step2 string) string { } return s } + +func NewClientID(group string, clientID *uint64, tubeMQClientVersion string) string { + res := "" Review Comment: clientID? ```suggestion clientID := "" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] bluewang opened a new pull request, #7956: [INLONG-7955][Dashboard] Change the consumption query method from get to post
bluewang opened a new pull request, #7956: URL: https://github.com/apache/inlong/pull/7956 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes Change the consumption query method from get to post -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #7951: [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 'replace' records
EMsnap merged PR #7951: URL: https://github.com/apache/inlong/pull/7951 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (4eea8d3d9 -> 0d80d4de5)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 4eea8d3d9 [INLONG-7938][Manager] Fix consume list interface does not filter by request (#7939) add 0d80d4de5 [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 'replace' records (#7951) No new revisions were added by this update. Summary of changes: .../MongoDBConnectorDeserializationSchema.java | 65 -- .../cdc/mongodb/debezium/utils/RecordUtils.java| 11 .../sort/cdc/mongodb/table/MongoDBTableSource.java | 4 +- .../{MongoRowKind.java => MongoDBRowKind.java} | 14 +++-- ...Validator.java => MongoDBRowKindValidator.java} | 19 +-- 5 files changed, 72 insertions(+), 41 deletions(-) rename inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/filter/{MongoRowKind.java => MongoDBRowKind.java} (85%) rename inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/filter/{RowKindValidator.java => MongoDBRowKindValidator.java} (74%)
[GitHub] [inlong] EMsnap merged pull request #7953: [INLONG-7952][Sort] Mask sensitive message of Flink SQL in the logs
EMsnap merged PR #7953: URL: https://github.com/apache/inlong/pull/7953 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (0d80d4de5 -> 3f4ebb520)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 0d80d4de5 [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 'replace' records (#7951) add 3f4ebb520 [INLONG-7952][Sort] Mask sensitive message of Flink SQL in the logs (#7953) No new revisions were added by this update. Summary of changes: .../apache/inlong/common/util/MaskDataUtils.java | 32 ++- .../inlong/common/util/MaskDataUtilsTest.java | 45 ++ .../inlong/sort/parser/impl/FlinkSqlParser.java| 10 +++-- .../sort/parser/result/FlinkSqlParseResult.java| 4 +- 4 files changed, 77 insertions(+), 14 deletions(-)
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on RawData, which are serialized canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner need to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner need to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format in addition to canal/debezium, which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on data serialized using canal/debezium data and are deserialized using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! The rawdatahashpartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! Purpose-wise the two are aiming to achieve the same thing. However, the rawdatahashpartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! RawdataHashPartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka
Yizhou-Yang commented on code in PR #7924: URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java: ## Review Comment: Hi! RawdataHashPartitioner was based on canal/debezium data and are parsed using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but needs to keep compatibility with canal/debezium in case), which does not have JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing JsonDynamicFormat. Another thing, design-wise, is that raw data is coming mostly from multiple sink scenerios, so instead of extending that partitioner, I want to make a seperate partitioner for single table, so that it might be more extendable and easier to maintain in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org