[ https://issues.apache.org/jira/browse/BEAM-14471?focusedWorklogId=773087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773087 ]
ASF GitHub Bot logged work on BEAM-14471: ----------------------------------------- Author: ASF GitHub Bot Created on: 21/May/22 01:53 Start Date: 21/May/22 01:53 Worklog Time Spent: 10m Work Description: ihji commented on code in PR #17674: URL: https://github.com/apache/beam/pull/17674#discussion_r878624245 ########## examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.multilang; + +import org.apache.beam.examples.common.ExampleUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.python.PythonExternalTransform; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** + * An example that counts words in Shakespeare and utilizes a Python external transform. + * + * <p>This class, {@link PythonDataframeWordCount}, uses Python DataframeTransform to count words + * from the input text file. The Python expansion service provided by --expansionService must allow + * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which can be done by + * passing --fully_qualified_name_glob commandline option when launching the expansion service). + * + * <p>Note that, for using Dataflow Runner, you should specify the following two additional + * arguments: + * + * <pre>{@code + * --experiments=use_runner_v2 + * --sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest + * }</pre> + */ +public class PythonDataframeWordCount { + + // Extract the words and create the rows for counting. + static class ExtractWordsFn extends DoFn<String, Row> { + public static final Schema SCHEMA = + Schema.of( + Schema.Field.of("word", Schema.FieldType.STRING), + Schema.Field.of("count", Schema.FieldType.INT32)); + private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); + private final Distribution lineLenDist = + Metrics.distribution(ExtractWordsFn.class, "lineLenDistro"); + + @ProcessElement + public void processElement(@Element String element, OutputReceiver<Row> receiver) { + lineLenDist.update(element.length()); + if (element.trim().isEmpty()) { + emptyLines.inc(); + } + + // Split the line into words. + String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + receiver.output( + Row.withSchema(SCHEMA) + .withFieldValue("word", word) + .withFieldValue("count", 1) + .build()); + } + } + } + } + + /** A SimpleFunction that converts a counted row into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<Row, String> { + @Override + public String apply(Row input) { + return input.getString("word") + ": " + input.getInt32("count"); + } + } + + /** Options supported by {@link PythonDataframeWordCount}. */ + public interface WordCountOptions extends PipelineOptions { + + /** + * By default, this example reads from a public dataset containing the text of King Lear. Set + * this option to choose a different input file or glob. + */ + @Description("Path of the file to read from") + @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") + String getInputFile(); + + void setInputFile(String value); + + /** Set this required option to specify where to write the output. */ + @Description("Path of the file to write to") + @Required + String getOutput(); + + void setOutput(String value); + + /** Set this required option to specify Python expansion service URL. */ + @Description("URL of Python expansion service") + @Required Review Comment: It's now optional but users need to specify it until we release a new SDK. Issue Time Tracking ------------------- Worklog Id: (was: 773087) Time Spent: 5h 20m (was: 5h 10m) > Adding testcases and examples for xlang Python DataframeTransform > ------------------------------------------------------------------ > > Key: BEAM-14471 > URL: https://issues.apache.org/jira/browse/BEAM-14471 > Project: Beam > Issue Type: Improvement > Components: cross-language, testing > Reporter: Heejong Lee > Assignee: Heejong Lee > Priority: P2 > Time Spent: 5h 20m > Remaining Estimate: 0h > > Adding testcases and examples for xlang Python DataframeTransform -- This message was sent by Atlassian Jira (v8.20.7#820007)