This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c3fca6d6b5dea09b9581438c89fa5188ecd8f2d2 Author: WangMinChao <33626973+mincw...@users.noreply.github.com> AuthorDate: Wed Dec 15 20:16:48 2021 +0800 [HUDI-3024] Add explicit write handler for flink (#4329) Co-authored-by: wangminchao <wangminc...@asinking.com> --- .../hudi/execution/ExplicitWriteHandler.java | 65 ++++++++++++++++++++++ .../hudi/execution/FlinkLazyInsertIterable.java | 24 ++++---- .../apache/hudi/io/ExplicitWriteHandleFactory.java | 6 +- .../java/org/apache/hudi/io/FlinkAppendHandle.java | 5 ++ 4 files changed, 86 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java new file mode 100644 index 0000000..46eff58 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.io.HoodieWriteHandle; + +import java.util.ArrayList; +import java.util.List; + +/** + * Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle. + */ +public class ExplicitWriteHandler<T extends HoodieRecordPayload> + extends BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> { + + private final List<WriteStatus> statuses = new ArrayList<>(); + + private HoodieWriteHandle handle; + + public ExplicitWriteHandler(HoodieWriteHandle handle) { + this.handle = handle; + } + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) { + final HoodieRecord insertPayload = payload.record; + handle.write(insertPayload, payload.insertValue, payload.exception); + } + + @Override + public void finish() { + closeOpenHandle(); + assert statuses.size() > 0; + } + + @Override + public List<WriteStatus> getResult() { + return statuses; + } + + private void closeOpenHandle() { + statuses.addAll(handle.close()); + } +} + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index b0674b2..78b3cb1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -27,7 +27,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.io.ExplicitWriteHandleFactory; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -36,15 +37,6 @@ import java.util.Iterator; import java.util.List; public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> { - public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, - boolean areRecordsSorted, - HoodieWriteConfig config, - String instantTime, - HoodieTable hoodieTable, - String idPrefix, - TaskContextSupplier taskContextSupplier) { - super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier); - } public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted, @@ -53,7 +45,7 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood HoodieTable hoodieTable, String idPrefix, TaskContextSupplier taskContextSupplier, - WriteHandleFactory writeHandleFactory) { + ExplicitWriteHandleFactory writeHandleFactory) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); } @@ -64,8 +56,8 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema, hoodieConfig)); + bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), + Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig)); final List<WriteStatus> result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; @@ -77,4 +69,10 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood } } } + + @SuppressWarnings("rawtypes") + private ExplicitWriteHandler getExplicitInsertHandler() { + HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle(); + return new ExplicitWriteHandler(handle); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java index 092e945..e598a03 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java @@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable; */ public class ExplicitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> { - private HoodieWriteHandle<T, I, K, O> writeHandle; + private final HoodieWriteHandle<T, I, K, O> writeHandle; public ExplicitWriteHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) { this.writeHandle = writeHandle; @@ -41,4 +41,8 @@ public class ExplicitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O> String fileIdPrefix, TaskContextSupplier taskContextSupplier) { return writeHandle; } + + public HoodieWriteHandle<T, I, K, O> getWriteHandle() { + return writeHandle; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 1872637..b514896 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -75,6 +75,11 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> } @Override + public boolean canWrite(HoodieRecord record) { + return true; + } + + @Override protected boolean needsUpdateLocation() { return false; }