[ https://issues.apache.org/jira/browse/BEAM-14483?focusedWorklogId=774700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774700 ]
ASF GitHub Bot logged work on BEAM-14483: ----------------------------------------- Author: ASF GitHub Bot Created on: 25/May/22 17:15 Start Date: 25/May/22 17:15 Worklog Time Spent: 10m Work Description: robertwb commented on code in PR #17696: URL: https://github.com/apache/beam/pull/17696#discussion_r881915006 ########## sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java: ########## @@ -72,6 +73,9 @@ private @Nullable Object @NonNull [] argsArray; private @Nullable Row providedKwargsRow; + Map<String, Coder<?>> outputCoders; + private static final String PYTHON_MAIN_OUTPUT_KEY = "0"; Review Comment: I'm wary of hard-coding this--what if it changes? If there is one and only one output, should we be insensitive to the name? ########## sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/PythonMap.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python.transforms; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.python.PythonExternalTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class PythonMap<InputT, OutputT> + extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { + + private PythonCallableSource pythonFunction; + private @Nullable String expansionService; + private Coder<?> outputCoder; + private static final String PYTHON_MAP_FN_TRANSFORM = "apache_beam.Map"; + private static final String PYTHON_FLATMAP_FN_TRANSFORM = "apache_beam.FlatMap"; + private String pythonTransform; + + private PythonMap( + PythonCallableSource pythonFunction, Coder<?> outputCoder, String pythonTransform) { + this.pythonFunction = pythonFunction; + this.outputCoder = outputCoder; + this.pythonTransform = pythonTransform; + } + + public static <InputT, OutputT> PythonMap<InputT, OutputT> viaMapFn( + PythonCallableSource pythonFunction, Coder<?> outputCoder) { Review Comment: Is it possible for outputCoder to be optional (if it can be inferred)? ########## sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/PythonMap.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python.transforms; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.python.PythonExternalTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class PythonMap<InputT, OutputT> + extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { + + private PythonCallableSource pythonFunction; + private @Nullable String expansionService; + private Coder<?> outputCoder; + private static final String PYTHON_MAP_FN_TRANSFORM = "apache_beam.Map"; + private static final String PYTHON_FLATMAP_FN_TRANSFORM = "apache_beam.FlatMap"; + private String pythonTransform; + + private PythonMap( + PythonCallableSource pythonFunction, Coder<?> outputCoder, String pythonTransform) { + this.pythonFunction = pythonFunction; + this.outputCoder = outputCoder; + this.pythonTransform = pythonTransform; + } + + public static <InputT, OutputT> PythonMap<InputT, OutputT> viaMapFn( + PythonCallableSource pythonFunction, Coder<?> outputCoder) { Review Comment: Is there value in the (presumably end-user) constructing a PythonCallableSource vs. just accepting a String here? Issue Time Tracking ------------------- Worklog Id: (was: 774700) Time Spent: 2h (was: 1h 50m) > Add Java cross-language transforms for invoking Python Map and FlatMap > ---------------------------------------------------------------------- > > Key: BEAM-14483 > URL: https://issues.apache.org/jira/browse/BEAM-14483 > Project: Beam > Issue Type: New Feature > Components: cross-language > Reporter: Chamikara Madhusanka Jayalath > Assignee: Chamikara Madhusanka Jayalath > Priority: P1 > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.7#820007)