zhangyifan27 commented on code in PR #7059:
URL: https://github.com/apache/inlong/pull/7059#discussion_r1058136032


##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/KuduAsyncSinkFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.apache.inlong.sort.kudu.source.KuduConsumerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static 
org.apache.inlong.sort.kudu.common.KuduOptions.CACHE_QUEUE_MAX_LENGTH;
+import static org.apache.inlong.sort.kudu.common.KuduOptions.MAX_BUFFER_TIME;
+import static 
org.apache.inlong.sort.kudu.common.KuduOptions.SINK_FORCE_WITH_UPSERT_MODE;
+import static 
org.apache.inlong.sort.kudu.common.KuduOptions.WRITE_THREAD_COUNT;
+
+/**
+ * The Flink kudu Producer in async Mode.
+ */
+@PublicEvolving
+public class KuduAsyncSinkFunction
+        extends
+            AbstractKuduSinkFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KuduAsyncSinkFunction.class);
+    private final Duration maxBufferTime;
+
+    private transient BlockingQueue<RowData> queue;
+
+    private transient List<KuduConsumerTask> consumerTasks;
+    private ExecutorService threadPool = null;
+
+    public KuduAsyncSinkFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration,
+            String inlongMetric,
+            String auditHostAndPorts) {
+        super(kuduTableInfo, configuration, inlongMetric, auditHostAndPorts);
+        this.maxBufferTime = 
parseDuration(configuration.getString(MAX_BUFFER_TIME));
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        int threadCnt = configuration.getInteger(WRITE_THREAD_COUNT);
+        int cacheQueueMaxLength = 
configuration.getInteger(CACHE_QUEUE_MAX_LENGTH);
+        if (cacheQueueMaxLength == -1) {
+            cacheQueueMaxLength = (maxBufferSize + 1) * (threadCnt + 1);
+        }
+        LOG.info("Opening KuduAsyncSinkFunction, threadCount:{}, 
cacheQueueMaxLength:{}, maxBufferSize:{}.",
+                threadCnt, cacheQueueMaxLength, maxBufferSize);
+
+        queue = new LinkedBlockingQueue<>(cacheQueueMaxLength);
+        consumerTasks = new ArrayList<>(threadCnt);
+        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("kudu-sink-pool-%d").build();
+        threadPool = new ThreadPoolExecutor(
+                threadCnt + 1,
+                threadCnt + 1,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(4),
+                namedThreadFactory,
+                new ThreadPoolExecutor.AbortPolicy());
+
+        boolean forceInUpsertMode = 
configuration.getBoolean(SINK_FORCE_WITH_UPSERT_MODE);
+        for (int threadIndex = 0; threadIndex < threadCnt; threadIndex++) {
+            KuduWriter kuduWriter = new KuduWriter(kuduTableInfo);
+            kuduWriter.open();

Review Comment:
   Maybe multiple threads should share a `kuduTable` object to avoid multiple 
openTable calls for a given table in the same task.



##########
inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/source/KuduLookupFunction.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kudu.source;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.kudu.common.KuduOptions;
+import org.apache.inlong.sort.kudu.common.KuduTableInfo;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
+
+/**
+ * The KuduLookupFunction is a standard user-defined table function, it can be
+ * used in tableAPI and also useful for temporal table join plan in SQL.
+ */
+public class KuduLookupFunction extends TableFunction<Row> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KuduLookupFunction.class);
+
+    /**
+     * The names of lookup key.
+     */
+    private final String[] keyNames;
+
+    /**
+     * The configuration for the tde source.
+     */
+    private final Configuration configuration;
+
+    /**
+     * The masters of kudu server.
+     */
+    private final String masters;
+
+    /**
+     * The name of kudu table.
+     */
+    private final String tableName;
+
+    /**
+     * The maximum number of retries.
+     */
+    private transient int maxRetries;
+
+    /**
+     * The cache for lookup results.
+     */
+    private transient Cache<Row, List<Row>> cache;
+
+    /**
+     * The client of kudu.
+     */
+    private transient KuduClient client;
+
+    /**
+     * The table of kudu.
+     */
+    private transient KuduTable table;
+
+    public KuduLookupFunction(
+            KuduTableInfo kuduTableInfo,
+            Configuration configuration) {
+        checkNotNull(configuration,
+                "The configuration must not be null.");
+
+        this.masters = kuduTableInfo.getMasters();
+        this.tableName = kuduTableInfo.getTableName();
+        this.keyNames = kuduTableInfo.getFieldNames();
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        maxRetries = configuration.getInteger(KuduOptions.MAX_RETRIES);
+        int maxCacheSize = 
configuration.getInteger(KuduOptions.MAX_CACHE_SIZE);
+        Duration maxCacheTime = parseDuration(configuration.getString(
+                KuduOptions.MAX_CACHE_TIME));
+        LOG.info("opening KuduLookupFunction, maxCacheSize:{}, 
maxCacheTime:{}.", maxCacheSize, maxCacheTime);
+
+        if (maxCacheSize > 0) {
+            cache =
+                    CacheBuilder.newBuilder()
+                            .maximumSize(maxCacheSize)
+                            .expireAfterWrite(maxCacheTime.toMillis(), 
TimeUnit.MILLISECONDS)
+                            .build();
+        }
+
+        this.client = new KuduClient.KuduClientBuilder(masters).build();
+
+        this.table = client.openTable(tableName);
+        LOG.info("KuduLookupFunction opened.");
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(configuration, masters, tableName);
+        result = 31 * result + Arrays.hashCode(keyNames);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KuduLookupFunction that = (KuduLookupFunction) o;
+        return Arrays.equals(keyNames, that.keyNames) && 
configuration.equals(that.configuration)
+                && masters.equals(that.masters) && 
tableName.equals(that.tableName);
+    }
+
+    private KuduPredicate predicateComparator(ColumnSchema column, Object 
value) {
+
+        KuduPredicate.ComparisonOp comparison = EQUAL;
+
+        KuduPredicate predicate;
+
+        switch (column.getType()) {
+            case STRING:
+                String data;
+                data = (String) value;
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, data);
+                break;
+            case FLOAT:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (float) value);
+                break;
+            case INT8:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (byte) value);
+                break;
+            case INT16:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (short) value);
+                break;
+            case INT32:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (int) value);
+                break;
+            case INT64:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (long) value);
+                break;
+            case DOUBLE:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (double) value);
+                break;
+            case BOOL:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (boolean) value);
+                break;
+            case UNIXTIME_MICROS:
+                Long time = (Long) value;
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, time * 1000);
+                break;
+            case BINARY:
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (byte[]) value);
+                break;
+            default:
+                throw new IllegalArgumentException("Illegal var type: " + 
column.getType());
+        }
+        return predicate;
+    }
+
+    public void eval(Object... keys) throws Exception {
+        if (keys.length != keyNames.length) {
+            throw new RuntimeException("The length of lookUpKey and 
lookUpKeyVals is difference!");
+        }
+        Row keyRow = buildCacheKey(keys);
+        if (this.cache != null) {
+            ConcurrentMap<Row, List<Row>> cacheMap = this.cache.asMap();
+            int keyCount = cacheMap.size();
+            List<Row> cacheRows = this.cache.getIfPresent(keyRow);
+            if (CollectionUtils.isNotEmpty(cacheRows)) {
+                for (Row cacheRow : cacheRows) {
+                    collect(cacheRow);
+                }
+                return;
+            }
+        }
+
+        for (int retry = 1; retry <= maxRetries; retry++) {
+            try {
+                final KuduScanToken.KuduScanTokenBuilder scanTokenBuilder = 
client.newScanTokenBuilder(table);
+                final Schema kuduTableSchema = table.getSchema();
+                for (int i = 0; i < keyNames.length; i++) {
+                    String keyName = keyNames[i];
+                    Object value = keys[i];
+                    final ColumnSchema column = 
kuduTableSchema.getColumn(keyName);
+                    KuduPredicate predicate = predicateComparator(column, 
value);
+                    scanTokenBuilder.addPredicate(predicate);
+                }
+                final List<KuduScanToken> tokenList = scanTokenBuilder.build();
+                ArrayList<Row> rows = new ArrayList<>();
+                for (final KuduScanToken token : tokenList) {
+                    final List<LocatedTablet.Replica> replicas = 
token.getTablet().getReplicas();
+                    final String[] array = replicas
+                            .stream()
+                            .map(replica -> replica.getRpcHost() + ":" + 
replica.getRpcPort())
+                            .collect(Collectors.toList()).toArray(new 
String[replicas.size()]);
+                    final byte[] scanToken = token.serialize();
+                    final KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(scanToken, client);
+                    RowResultIterator rowIterator = scanner.nextRows();

Review Comment:
   We can use the `KuduScannerIterator` API to process the RowResults through 
`scanner.iterator()`, which contains keep alive calls to ensure the scanner 
does not time out. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to