[ 
https://issues.apache.org/jira/browse/BEAM-13093?focusedWorklogId=716100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-716100
 ]

ASF GitHub Bot logged work on BEAM-13093:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jan/22 23:37
            Start Date: 26/Jan/22 23:37
    Worklog Time Spent: 10m 
      Work Description: ihji commented on a change in pull request #15767:
URL: https://github.com/apache/beam/pull/15767#discussion_r793139264



##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
##########
@@ -83,62 +84,198 @@
  * 
(https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA";)
 for further
  * details.
  */
-@RunWith(JUnit4.class)
-public class ValidateRunnerXlangTest implements Serializable {
-  @Rule public transient TestPipeline testPipeline = TestPipeline.create();
-  private PipelineResult pipelineResult;
+public class ValidateRunnerXlangTest {
+  static class ValidateRunnerXlangTestBase implements Serializable {
+    @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+    private PipelineResult pipelineResult;
 
-  // URNs for core cross-language transforms.
-  // See 
https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA 
for further
-  // details.
-  private static final String TEST_PREFIX_URN = 
"beam:transforms:xlang:test:prefix";
-  private static final String TEST_MULTI_URN = 
"beam:transforms:xlang:test:multi";
-  private static final String TEST_GBK_URN = "beam:transforms:xlang:test:gbk";
-  private static final String TEST_CGBK_URN = 
"beam:transforms:xlang:test:cgbk";
-  private static final String TEST_COMGL_URN = 
"beam:transforms:xlang:test:comgl";
-  private static final String TEST_COMPK_URN = 
"beam:transforms:xlang:test:compk";
-  private static final String TEST_FLATTEN_URN = 
"beam:transforms:xlang:test:flatten";
-  private static final String TEST_PARTITION_URN = 
"beam:transforms:xlang:test:partition";
-  private static final String TEST_PYTHON_BS4_URN = 
"beam:transforms:xlang:test:python_bs4";
+    // URNs for core cross-language transforms.
+    // See 
https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA 
for
+    // further
+    // details.
+    private static final String TEST_PREFIX_URN = 
"beam:transforms:xlang:test:prefix";
+    private static final String TEST_MULTI_URN = 
"beam:transforms:xlang:test:multi";
+    private static final String TEST_GBK_URN = 
"beam:transforms:xlang:test:gbk";
+    private static final String TEST_CGBK_URN = 
"beam:transforms:xlang:test:cgbk";
+    private static final String TEST_COMGL_URN = 
"beam:transforms:xlang:test:comgl";
+    private static final String TEST_COMPK_URN = 
"beam:transforms:xlang:test:compk";
+    private static final String TEST_FLATTEN_URN = 
"beam:transforms:xlang:test:flatten";
+    private static final String TEST_PARTITION_URN = 
"beam:transforms:xlang:test:partition";
+    private static final String TEST_PYTHON_BS4_URN = 
"beam:transforms:xlang:test:python_bs4";
 
-  private static String expansionAddr;
-  private static String expansionJar;
+    private static String expansionAddr;
+    private static String expansionJar;
 
-  @BeforeClass
-  public static void setUpClass() {
-    expansionAddr =
-        String.format("localhost:%s", 
Integer.valueOf(System.getProperty("expansionPort")));
-    expansionJar = System.getProperty("expansionJar");
-  }
+    @BeforeClass
+    public static void setUpClass() {
+      expansionAddr =
+          String.format("localhost:%s", 
Integer.valueOf(System.getProperty("expansionPort")));
+      expansionJar = System.getProperty("expansionJar");
+    }
 
-  @Before
-  public void setUp() {
-    ExperimentalOptions.addExperiment(
-        testPipeline.getOptions().as(ExperimentalOptions.class), 
"jar_packages=" + expansionJar);
-    waitForReady();
-  }
+    @Before
+    public void setUp() {
+      ExperimentalOptions.addExperiment(
+          testPipeline.getOptions().as(ExperimentalOptions.class), 
"jar_packages=" + expansionJar);
+      waitForReady();
+    }
 
-  @After
-  public void tearDown() {
-    pipelineResult = testPipeline.run();
-    pipelineResult.waitUntilFinish();
-    assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
-  }
+    @After
+    public void tearDown() {
+      pipelineResult = testPipeline.run();
+      pipelineResult.waitUntilFinish();
+      assertThat(pipelineResult.getState(), 
equalTo(PipelineResult.State.DONE));
+    }
 
-  private void waitForReady() {
-    try {
-      ManagedChannel channel = 
ManagedChannelBuilder.forTarget(expansionAddr).build();
-      ConnectivityState state = channel.getState(true);
-      for (int retry = 0; retry < 30 && state != ConnectivityState.READY; 
retry++) {
-        Thread.sleep(500);
-        state = channel.getState(true);
+    private void waitForReady() {
+      try {
+        ManagedChannel channel = 
ManagedChannelBuilder.forTarget(expansionAddr).build();
+        ConnectivityState state = channel.getState(true);
+        for (int retry = 0; retry < 30 && state != ConnectivityState.READY; 
retry++) {
+          Thread.sleep(500);
+          state = channel.getState(true);
+        }
+        channel.shutdownNow();
+      } catch (InterruptedException e) {
+        throw new RuntimeException("interrupted.");
       }
-      channel.shutdownNow();
-    } catch (InterruptedException e) {
-      throw new RuntimeException("interrupted.");
     }
-  }
 
+    private byte[] toStringPayloadBytes(String data) throws IOException {
+      Row configRow =
+          Row.withSchema(Schema.of(Field.of("data", FieldType.STRING)))
+              .withFieldValue("data", data)
+              .build();
+
+      ByteString.Output outputStream = ByteString.newOutput();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), false))
+              .setPayload(outputStream.toByteString())
+              .build();
+      return payload.toByteArray();
+    }
+
+    protected void singleInputOutputTest(Pipeline pipeline) throws IOException 
{
+      PCollection<String> col =
+          pipeline
+              .apply(Create.of("1", "2", "3"))
+              .apply(External.of(TEST_PREFIX_URN, toStringPayloadBytes("0"), 
expansionAddr));
+      PAssert.that(col).containsInAnyOrder("01", "02", "03");
+    }
+
+    protected void multiInputOutputWithSideInputTest(Pipeline pipeline) {
+      PCollection<String> main1 = pipeline.apply("createMain1", Create.of("a", 
"bb"));
+      PCollection<String> main2 = pipeline.apply("createMain2", Create.of("x", 
"yy", "zzz"));
+      PCollection<String> side = pipeline.apply("createSide", Create.of("s"));
+      PCollectionTuple pTuple =
+          PCollectionTuple.of("main1", main1)
+              .and("main2", main2)
+              .and("side", side)
+              .apply(External.of(TEST_MULTI_URN, new byte[] {}, 
expansionAddr).withMultiOutputs());

Review comment:
       Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 716100)
    Time Spent: 15h  (was: 14h 50m)

> Enable JavaUsingPython CrossLanguageValidateRunner test for dataflow runner v2
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-13093
>                 URL: https://issues.apache.org/jira/browse/BEAM-13093
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, testing
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 15h
>  Remaining Estimate: 0h
>
> Enable JavaUsingPython CrossLanguageValidateRunner test for dataflow runner v2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to