[GitHub] [inlong] dockerzhang merged pull request #7939: [INLONG-7938][Manager] Fix consume list interface does not filter by request

2023-05-04 Thread via GitHub


dockerzhang merged PR #7939:
URL: https://github.com/apache/inlong/pull/7939


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7938][Manager] Fix consume list interface does not filter by request (#7939)

2023-05-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 4eea8d3d9 [INLONG-7938][Manager] Fix consume list interface does not 
filter by request (#7939)
4eea8d3d9 is described below

commit 4eea8d3d9b3569cfc2623956673118c651b0a234
Author: vernedeng 
AuthorDate: Thu May 4 15:57:56 2023 +0800

[INLONG-7938][Manager] Fix consume list interface does not filter by 
request (#7939)
---
 .../manager/client/api/inner/client/InlongConsumeClient.java   | 10 ++
 .../inlong/manager/client/api/service/InlongConsumeApi.java|  8 +++-
 .../manager/client/api/inner/InlongConsumeClientTest.java  |  2 +-
 .../inlong/manager/web/controller/InlongConsumeController.java |  4 ++--
 4 files changed, 8 insertions(+), 16 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
index 4021e09f2..9c0b53f28 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
@@ -17,12 +17,10 @@
 
 package org.apache.inlong.manager.client.api.inner.client;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.InlongConsumeApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -33,8 +31,6 @@ import 
org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 
-import java.util.Map;
-
 /**
  * Client for {@link InlongConsumeApi}.
  */
@@ -96,12 +92,10 @@ public class InlongConsumeClient {
  * @return inlong consume list
  */
 public PageResult list(InlongConsumePageRequest 
request) {
-Map requestMap = 
JsonUtils.OBJECT_MAPPER.convertValue(request,
-new TypeReference>() {
-});
+Preconditions.expectNotNull(request, "inlong consume request cannot be 
null");
 
 Response> response = 
ClientUtils.executeHttpCall(
-inlongConsumeApi.list(requestMap));
+inlongConsumeApi.list(request));
 ClientUtils.assertRespSuccess(response);
 return response.getData();
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
index d78dd5a5a..7f560ba27 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import retrofit2.Call;
@@ -30,9 +31,6 @@ import retrofit2.http.DELETE;
 import retrofit2.http.GET;
 import retrofit2.http.POST;
 import retrofit2.http.Path;
-import retrofit2.http.Query;
-
-import java.util.Map;
 
 public interface InlongConsumeApi {
 
@@ -45,8 +43,8 @@ public interface InlongConsumeApi {
 @GET("consume/countStatus")
 Call> countStatusByUser();
 
-@GET("consume/list")
-Call>> list(@Query("request") 
Map request);
+@POST("consume/list")
+Call>> list(@Body 
InlongConsumePageRequest request);
 
 @POST("consume/update")
 Call> update(@Body InlongConsumeRequest request);
diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
index f134f2f81..defb3c269 100644
--- 
a/inlong-manager/manager-

[GitHub] [inlong] xuehuanran commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK

2023-05-04 Thread via GitHub


xuehuanran commented on code in PR #7933:
URL: https://github.com/apache/inlong/pull/7933#discussion_r1185058338


##
inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go:
##
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+   "context"
+   "os"
+   "strconv"
+   "strings"
+   "sync"
+   "sync/atomic"
+   "time"
+
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport"
+   
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type producer struct {
+   clientIDstring
+   config  *config.Config
+   nextAuth2Master int32
+   selectorselector.Selector
+   master  *selector.Node
+   client  rpc.RPCClient
+   masterHBRetry   int
+   unreportedTimes int
+   publishTopics   []string
+   brokerCheckSum  int64
+   brokerMap   map[string]*metadata.Node
+   brokerMusync.Mutex
+}
+
+// NewProducer returns a producer which is constructed by a given config.
+func NewProducer(config *config.Config) (Producer, error) {
+   if err := config.ValidateProducer(); err != nil {
+   return nil, err
+   }
+   log.Infof("The config of the producer is %s", config)
+
+   selector, err := selector.Get("ip")
+   if err != nil {
+   return nil, err
+   }
+
+   clientID := newProducerClientID()
+   pool := multiplexing.NewPool()
+   opts := &transport.Options{}
+   if config.Net.TLS.Enable {
+   opts.TLSEnable = true
+   opts.CACertFile = config.Net.TLS.CACertFile
+   opts.TLSCertFile = config.Net.TLS.TLSCertFile
+   opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+   opts.TLSServerName = config.Net.TLS.TLSServerName
+   }
+
+   client := rpc.New(pool, opts, config)
+
+   p := &producer{
+   config:  config,
+   clientID:clientID,
+   selector:selector,
+   client:  client,
+   unreportedTimes: 0,
+   brokerMap:   make(map[string]*metadata.Node),
+   publishTopics:   config.Producer.Topics,
+   }
+
+   err = p.register2Master(true)
+   if err != nil {
+   return nil, err
+   }
+
+   log.Infof("[PRODUCER] start producer success, client=%s", clientID)
+   return p, nil
+}
+
+func (p *producer) register2Master(needChange bool) error {
+   if needChange {
+   p.selector.Refresh(p.config.Producer.Masters)
+   node, err := p.selector.Select(p.config.Producer.Masters)
+   if err != nil {
+   return err
+   }
+   p.master = node
+   }
+
+   retryCount := 0
+   for {
+   rsp, err := p.sendRegRequest2Master()
+   if err != nil || !rsp.GetSuccess() {
+   if err != nil {
+   log.Errorf("[PRODUCER]register2Master error 
%s", err.Error())
+   }
+   if !p.master.HasNext {
+   if err != nil {
+   return err
+   }
+   if rsp != nil {
+   log.Errorf("[PRODUCER] 
register2Master(%s) failure exist register, client=%s, er

[GitHub] [inlong] xuehuanran commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK

2023-05-04 Thread via GitHub


xuehuanran commented on code in PR #7933:
URL: https://github.com/apache/inlong/pull/7933#discussion_r1185062731


##
inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer.go:
##
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+type Producer interface {

Review Comment:
   Yes, the functions will be committed in later PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] TszKitLo40 commented on a diff in pull request #7933: [INLONG-7926][TubeMQ] Add "Register2Master" method for GO SDK

2023-05-04 Thread via GitHub


TszKitLo40 commented on code in PR #7933:
URL: https://github.com/apache/inlong/pull/7933#discussion_r1185617475


##
inlong-tubemq/tubemq-client-twins/tubemq-client-go/util/util.go:
##
@@ -147,3 +150,17 @@ func Join(m map[string]string, step1 string, step2 string) 
string {
}
return s
 }
+
+func NewClientID(group string, clientID *uint64, tubeMQClientVersion string) 
string {
+   res := ""

Review Comment:
   clientID?
   ```suggestion
clientID := ""
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] bluewang opened a new pull request, #7956: [INLONG-7955][Dashboard] Change the consumption query method from get to post

2023-05-04 Thread via GitHub


bluewang opened a new pull request, #7956:
URL: https://github.com/apache/inlong/pull/7956

   
   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes Change the consumption query method from get to post
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap merged pull request #7951: [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 'replace' records

2023-05-04 Thread via GitHub


EMsnap merged PR #7951:
URL: https://github.com/apache/inlong/pull/7951


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (4eea8d3d9 -> 0d80d4de5)

2023-05-04 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 4eea8d3d9 [INLONG-7938][Manager] Fix consume list interface does not 
filter by request (#7939)
 add 0d80d4de5 [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 
'replace' records (#7951)

No new revisions were added by this update.

Summary of changes:
 .../MongoDBConnectorDeserializationSchema.java | 65 --
 .../cdc/mongodb/debezium/utils/RecordUtils.java| 11 
 .../sort/cdc/mongodb/table/MongoDBTableSource.java |  4 +-
 .../{MongoRowKind.java => MongoDBRowKind.java} | 14 +++--
 ...Validator.java => MongoDBRowKindValidator.java} | 19 +--
 5 files changed, 72 insertions(+), 41 deletions(-)
 rename 
inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/filter/{MongoRowKind.java
 => MongoDBRowKind.java} (85%)
 rename 
inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/filter/{RowKindValidator.java
 => MongoDBRowKindValidator.java} (74%)



[GitHub] [inlong] EMsnap merged pull request #7953: [INLONG-7952][Sort] Mask sensitive message of Flink SQL in the logs

2023-05-04 Thread via GitHub


EMsnap merged PR #7953:
URL: https://github.com/apache/inlong/pull/7953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (0d80d4de5 -> 3f4ebb520)

2023-05-04 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 0d80d4de5 [INLONG-7945][Sort] Fix MongoDB CDC unable to output the 
'replace' records (#7951)
 add 3f4ebb520 [INLONG-7952][Sort] Mask sensitive message of Flink SQL in 
the logs (#7953)

No new revisions were added by this update.

Summary of changes:
 .../apache/inlong/common/util/MaskDataUtils.java   | 32 ++-
 .../inlong/common/util/MaskDataUtilsTest.java  | 45 ++
 .../inlong/sort/parser/impl/FlinkSqlParser.java| 10 +++--
 .../sort/parser/result/FlinkSqlParseResult.java|  4 +-
 4 files changed, 77 insertions(+), 14 deletions(-)



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on RawData, which are serialized 
canal/debezium data and are deserialized using corresponding JsonDynamicFormat. 
However, the PrimaryKeyPartitioner need to support csv/avro/json format in 
addition to canal/debezium, which does not have JsonDynamicFormat and need not 
to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer 
Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using 
canal/debezium data and are deserialized using corresponding JsonDynamicFormat. 
However, the PrimaryKeyPartitioner need to support csv/avro/json format in 
addition to canal/debezium, which does not have JsonDynamicFormat and need not 
to be parsed. If we use RawDataHashPartitioner, there will be Null Pointer 
Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using 
canal/debezium data and are deserialized using corresponding JsonDynamicFormat. 
However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json 
format in addition to canal/debezium, which does not have JsonDynamicFormat and 
need not to be parsed. If we use RawDataHashPartitioner, there will be Null 
Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using 
canal/debezium data and are deserialized using corresponding JsonDynamicFormat. 
However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json 
format (but needs to keep compatibility with canal/debezium in case), which 
does not have JsonDynamicFormat and need not to be parsed. If we use 
RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing 
JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on data serialized using 
canal/debezium data and are deserialized using corresponding JsonDynamicFormat. 
However, the PrimaryKeyPartitioner is used primarily to support csv/avro/json 
format (but needs to keep compatibility with canal/debezium), which does not 
have JsonDynamicFormat and need not to be parsed. If we use 
RawDataHashPartitioner, there will be Null Pointer Exceptions from the missing 
JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi! The rawdatahashpartitioner was based on canal/debezium data and are 
parsed using corresponding JsonDynamicFormat. However, the 
PrimaryKeyPartitioner is used primarily to support csv/avro/json format (but 
needs to keep compatibility with canal/debezium in case), which does not have 
JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, 
there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi!  Purpose-wise the two are aiming to achieve the same thing. However, the 
rawdatahashpartitioner was based on canal/debezium data and are parsed using 
corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner is used 
primarily to support csv/avro/json format (but needs to keep compatibility with 
canal/debezium in case), which does not have JsonDynamicFormat and need not to 
be parsed. If we use RawDataHashPartitioner, there will be Null Pointer 
Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi!  RawdataHashPartitioner was based on canal/debezium data and are parsed 
using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner 
is used primarily to support csv/avro/json format (but needs to keep 
compatibility with canal/debezium in case), which does not have 
JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, 
there will be Null Pointer Exceptions from the missing JsonDynamicFormat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7924: [INLONG-7900][Sort] Support partition by primary key when upsert single table of Kafka

2023-05-04 Thread via GitHub


Yizhou-Yang commented on code in PR #7924:
URL: https://github.com/apache/inlong/pull/7924#discussion_r1185687301


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/partitioner/PrimaryKeyPartitioner.java:
##


Review Comment:
   Hi!  RawdataHashPartitioner was based on canal/debezium data and are parsed 
using corresponding JsonDynamicFormat. In contrast, the PrimaryKeyPartitioner 
is used primarily to support csv/avro/json format (but needs to keep 
compatibility with canal/debezium in case), which does not have 
JsonDynamicFormat and need not to be parsed. If we use RawDataHashPartitioner, 
there will be Null Pointer Exceptions from the missing JsonDynamicFormat.
   
   Another thing, design-wise, is that raw data is coming mostly from multiple 
sink scenerios, so instead of extending that partitioner, I want to make a 
seperate partitioner for single table, so that it might be more extendable and 
easier to maintain in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org