dianfu commented on a change in pull request #12841:
URL: https://github.com/apache/flink/pull/12841#discussion_r451938393



##########
File path: 
flink-python/src/main/java/org/apache/flink/python/env/ProcessEnvironment.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Map;
+
+/**
+ * A {@link PythonEnvironment} for executing UDFs in Process.
+ */
+@Internal
+public class ProcessEnvironment implements PythonEnvironment {

Review comment:
       rename to ProcessPythonEnvironment?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -320,23 +307,20 @@ private void checkInvokeFinishBundleByCount() throws 
Exception {
         */
        private void checkInvokeFinishBundleByTime() throws Exception {
                long now = 
getProcessingTimeService().getCurrentProcessingTime();
-               if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+               if (now - lastFinishBundleTime >= maxBundleTimeMills && 
elementCount > 0) {
                        invokeFinishBundle();
                }
        }
 
-       private void invokeFinishBundle() throws Exception {
-               if (bundleStarted.compareAndSet(true, false)) {
-                       pythonFunctionRunner.finishBundle();
-
-                       emitResults();
-                       elementCount = 0;
-                       lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
-                       // callback only after current bundle was fully 
finalized
-                       if (bundleFinishedCallback != null) {
-                               bundleFinishedCallback.run();
-                               bundleFinishedCallback = null;
-                       }
+       protected void invokeFinishBundle() throws Exception {

Review comment:
       There are many places calling this method without checking the 
elementCount. What about wrapping the logic in an **if** check and then there 
is no need to check it everywhere calling this method.
   ```
   if (elementCount > 0) {
   xxx
   }
   ```
   

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
##########
@@ -90,4 +90,10 @@ public void bufferInput(CRow input) {
        public Row getFunctionInput(CRow element) {
                return Row.project(element.row(), 
userDefinedFunctionInputOffsets);
        }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public TypeSerializer<Row> getInputTypeSerializer() {

Review comment:
       ditto

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
##########
@@ -76,53 +70,74 @@ public RowDataArrowPythonScalarFunctionOperator(
        @Override
        public void open() throws Exception {
                super.open();
-               allocator = 
ArrowUtils.getRootAllocator().newChildAllocator("reader", 0, Long.MAX_VALUE);
-               reader = new ArrowStreamReader(bais, allocator);
+               maxArrowBatchSize = 
Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);
+               arrowSerializer = new 
RowDataArrowSerializer(userDefinedFunctionInputType, 
userDefinedFunctionOutputType);
+               arrowSerializer.open(bais, baos);
+               currentBatchCount = 0;
        }
 
        @Override
-       public void close() throws Exception {
-               try {
-                       super.close();
-               } finally {
-                       reader.close();
-                       allocator.close();
+       public void processElement(StreamRecord<RowData> element) throws 
Exception {
+               RowData value = element.getValue();
+               bufferInput(value);
+               arrowSerializer.dump(getFunctionInput(value));
+               currentBatchCount++;
+               if (currentBatchCount >= maxArrowBatchSize) {
+                       invokeCurrentBatch();
                }
+               checkInvokeFinishBundleByCount();
+               emitResults();
+       }
+
+       @Override
+       protected void invokeFinishBundle() throws Exception {
+               invokeCurrentBatch();
+               super.invokeFinishBundle();
        }
 
        @Override
-       public PythonFunctionRunner<RowData> createPythonFunctionRunner(
-               FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager,
-               Map<String, String> jobOptions) {
-               return new RowDataArrowPythonScalarFunctionRunner(
-                       getRuntimeContext().getTaskName(),
-                       resultReceiver,
-                       scalarFunctions,
-                       pythonEnvironmentManager,
-                       userDefinedFunctionInputType,
-                       userDefinedFunctionOutputType,
-                       getPythonConfig().getMaxArrowBatchSize(),
-                       jobOptions,
-                       getFlinkMetricContainer());
+       public void endInput() throws Exception {
+               invokeCurrentBatch();
+               super.endInput();
+       }
+
+       @Override
+       public void dispose() throws Exception {

Review comment:
       Should also add dispose method in ArrowPythonScalarFunctionOperator.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
##########
@@ -157,6 +157,21 @@
         */
        protected transient TypeSerializer<Row> forwardedInputSerializer;
 
+       /**
+        *
+        */
+       protected transient TypeSerializer<Row> inputTypeSerializer;

Review comment:
       private

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonStatelessFunctionRunner.java
##########
@@ -158,24 +156,6 @@ public ExecutableStage createExecutableStage() throws 
Exception {
                        components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
        }
 
-       public FlinkFnApi.UserDefinedFunction 
getUserDefinedFunctionProto(PythonFunctionInfo pythonFunctionInfo) {
-               FlinkFnApi.UserDefinedFunction.Builder builder = 
FlinkFnApi.UserDefinedFunction.newBuilder();
-               
builder.setPayload(ByteString.copyFrom(pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
-               for (Object input : pythonFunctionInfo.getInputs()) {
-                       FlinkFnApi.UserDefinedFunction.Input.Builder inputProto 
=
-                               
FlinkFnApi.UserDefinedFunction.Input.newBuilder();
-                       if (input instanceof PythonFunctionInfo) {
-                               
inputProto.setUdf(getUserDefinedFunctionProto((PythonFunctionInfo) input));
-                       } else if (input instanceof Integer) {
-                               inputProto.setInputOffset((Integer) input);
-                       } else {
-                               
inputProto.setInputConstant(ByteString.copyFrom((byte[]) input));
-                       }
-                       builder.addInputs(inputProto);
-               }
-               return builder.build();
-       }
-

Review comment:
       getInputType/getOutputType isn't used any more and can be removed.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType 
data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {

Review comment:
       Mark as @Internal, the same for the other classes.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
##########
@@ -76,53 +70,74 @@ public RowDataArrowPythonScalarFunctionOperator(
        @Override
        public void open() throws Exception {
                super.open();
-               allocator = 
ArrowUtils.getRootAllocator().newChildAllocator("reader", 0, Long.MAX_VALUE);
-               reader = new ArrowStreamReader(bais, allocator);
+               maxArrowBatchSize = 
Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);

Review comment:
       The logic for maxArrowBatchSize is different between 
RowDataArrowPythonScalarFunctionOperator and ArrowPythonScalarFunctionOperator

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
##########
@@ -212,7 +181,54 @@ public void close() throws Exception {
        }
 
        @Override
-       public void startBundle() {
+       public void process(byte[] data) throws Exception {
+               checkInvokeStartBundle();
+               
mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(data));
+       }
+
+       @Override
+       public Tuple2<byte[], Integer> pollResult() throws Exception {
+               byte[] result = resultBuffer.poll();
+               if (result == null) {
+                       return null;
+               } else {
+                       this.resultTuple.f0 = result;
+                       this.resultTuple.f1 = result.length;
+                       return this.resultTuple;
+               }
+       }
+
+       @Override
+       public void flush() throws Exception {
+               if (bundleStarted) {
+                       finishBundle();
+                       bundleStarted = false;
+               }
+       }
+
+       public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) 
throws Exception {
+               return DefaultJobBundleFactory.create(
+                       JobInfo.create(taskName, taskName, 
environmentManager.createRetrievalToken(), pipelineOptions));
+       }
+
+       /**
+        * Creates a specification which specifies the portability Python 
execution environment.
+        * It's used by Beam's portability framework to creates the actual 
Python execution environment.
+        */
+       protected RunnerApi.Environment createPythonExecutionEnvironment() 
throws Exception {

Review comment:
       private

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType 
data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {
+
+       /**
+        * The input RowType.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output RowType.
+        */
+       protected final RowType outputType;
+
+       /**
+        * Allocator which is used for byte buffer allocation.
+        */
+       private transient BufferAllocator allocatorReader;

Review comment:
       rename to allocator

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowDataPythonScalarFunctionOperator.java
##########
@@ -98,6 +99,12 @@ public RowData getFunctionInput(RowData element) {
                return udfInputProjection.apply(element);
        }
 
+       @Override
+       @SuppressWarnings("unchecked")
+       public TypeSerializer<RowData> getInputTypeSerializer() {

Review comment:
       Move to RowDataPythonScalarFunctionOperator as this method is not 
necessary for RowDataArrowPythonScalarFunctionOperator

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -296,19 +286,16 @@ private void reserveMemoryForPythonWorker() throws 
MemoryReservationException {
                }
        }
 
-       /**
-        * Checks whether to invoke startBundle.
-        */
-       private void checkInvokeStartBundle() throws Exception {
-               if (bundleStarted.compareAndSet(false, true)) {
-                       pythonFunctionRunner.startBundle();
+       protected void emitResults() throws Exception {
+               while ((resultTuple = pythonFunctionRunner.pollResult()) != 
null) {
+                       emitResult();

Review comment:
       What about make **resultTuple** a local variable? e.g.
   ```
   emitResult(resultTuple);
   ```
   It will make the code more readable. What do you think?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType 
data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {
+
+       /**
+        * The input RowType.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output RowType.
+        */
+       protected final RowType outputType;
+
+       /**
+        * Allocator which is used for byte buffer allocation.
+        */
+       private transient BufferAllocator allocatorReader;
+
+       /**
+        * Reader which is responsible for deserialize the Arrow format data to 
the Flink rows.
+        */
+       private transient ArrowReader<T> arrowReader;
+
+       /**
+        * Reader which is responsible for convert the execution result from
+        * byte array to arrow format.
+        */
+       private transient ArrowStreamReader arrowStreamReader;
+
+       /**
+        * Container that holds a set of vectors for the input elements
+        * to be sent to the Python worker.
+        */
+       protected transient VectorSchemaRoot rootWriter;

Review comment:
       private

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType 
data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {

Review comment:
       What about move to package 
**org.apache.flink.table.runtime.arrow.serializers**?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
##########
@@ -157,6 +157,21 @@
         */
        protected transient TypeSerializer<Row> forwardedInputSerializer;
 
+       /**
+        *
+        */
+       protected transient TypeSerializer<Row> inputTypeSerializer;
+
+       /**
+        * Reusable OutputStream used to holding the serialized input elements.
+        */
+       protected transient ByteArrayOutputStreamWithPos baos;
+
+       /**
+        * OutputStream Wrapper.
+        */
+       protected transient DataOutputViewStreamWrapper baosWrapper;

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to