[ 
https://issues.apache.org/jira/browse/BEAM-14471?focusedWorklogId=773087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773087
 ]

ASF GitHub Bot logged work on BEAM-14471:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/22 01:53
            Start Date: 21/May/22 01:53
    Worklog Time Spent: 10m 
      Work Description: ihji commented on code in PR #17674:
URL: https://github.com/apache/beam/pull/17674#discussion_r878624245


##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external 
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python 
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by 
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which 
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the 
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following 
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ * 
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
+ * }</pre>
+ */
+public class PythonDataframeWordCount {
+
+  // Extract the words and create the rows for counting.
+  static class ExtractWordsFn extends DoFn<String, Row> {
+    public static final Schema SCHEMA =
+        Schema.of(
+            Schema.Field.of("word", Schema.FieldType.STRING),
+            Schema.Field.of("count", Schema.FieldType.INT32));
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, 
"emptyLines");
+    private final Distribution lineLenDist =
+        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<Row> 
receiver) {
+      lineLenDist.update(element.length());
+      if (element.trim().isEmpty()) {
+        emptyLines.inc();
+      }
+
+      // Split the line into words.
+      String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(
+              Row.withSchema(SCHEMA)
+                  .withFieldValue("word", word)
+                  .withFieldValue("count", 1)
+                  .build());
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a counted row into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<Row, String> {
+    @Override
+    public String apply(Row input) {
+      return input.getString("word") + ": " + input.getInt32("count");
+    }
+  }
+
+  /** Options supported by {@link PythonDataframeWordCount}. */
+  public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the 
text of King Lear. Set
+     * this option to choose a different input file or glob.
+     */
+    @Description("Path of the file to read from")
+    @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+    String getInputFile();
+
+    void setInputFile(String value);
+
+    /** Set this required option to specify where to write the output. */
+    @Description("Path of the file to write to")
+    @Required
+    String getOutput();
+
+    void setOutput(String value);
+
+    /** Set this required option to specify Python expansion service URL. */
+    @Description("URL of Python expansion service")
+    @Required

Review Comment:
   It's now optional but users need to specify it until we release a new SDK.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 773087)
    Time Spent: 5h 20m  (was: 5h 10m)

> Adding testcases and examples for xlang Python DataframeTransform 
> ------------------------------------------------------------------
>
>                 Key: BEAM-14471
>                 URL: https://issues.apache.org/jira/browse/BEAM-14471
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, testing
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Adding testcases and examples for xlang Python DataframeTransform 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to