[ https://issues.apache.org/jira/browse/BEAM-13093?focusedWorklogId=716033&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-716033 ]
ASF GitHub Bot logged work on BEAM-13093: ----------------------------------------- Author: ASF GitHub Bot Created on: 26/Jan/22 21:34 Start Date: 26/Jan/22 21:34 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #15767: URL: https://github.com/apache/beam/pull/15767#discussion_r793064847 ########## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java ########## @@ -83,62 +84,198 @@ * (https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA") for further * details. */ -@RunWith(JUnit4.class) -public class ValidateRunnerXlangTest implements Serializable { - @Rule public transient TestPipeline testPipeline = TestPipeline.create(); - private PipelineResult pipelineResult; +public class ValidateRunnerXlangTest { + static class ValidateRunnerXlangTestBase implements Serializable { + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + private PipelineResult pipelineResult; - // URNs for core cross-language transforms. - // See https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA for further - // details. - private static final String TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix"; - private static final String TEST_MULTI_URN = "beam:transforms:xlang:test:multi"; - private static final String TEST_GBK_URN = "beam:transforms:xlang:test:gbk"; - private static final String TEST_CGBK_URN = "beam:transforms:xlang:test:cgbk"; - private static final String TEST_COMGL_URN = "beam:transforms:xlang:test:comgl"; - private static final String TEST_COMPK_URN = "beam:transforms:xlang:test:compk"; - private static final String TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"; - private static final String TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"; - private static final String TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4"; + // URNs for core cross-language transforms. + // See https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA for + // further + // details. + private static final String TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix"; + private static final String TEST_MULTI_URN = "beam:transforms:xlang:test:multi"; + private static final String TEST_GBK_URN = "beam:transforms:xlang:test:gbk"; + private static final String TEST_CGBK_URN = "beam:transforms:xlang:test:cgbk"; + private static final String TEST_COMGL_URN = "beam:transforms:xlang:test:comgl"; + private static final String TEST_COMPK_URN = "beam:transforms:xlang:test:compk"; + private static final String TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"; + private static final String TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"; + private static final String TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4"; - private static String expansionAddr; - private static String expansionJar; + private static String expansionAddr; + private static String expansionJar; - @BeforeClass - public static void setUpClass() { - expansionAddr = - String.format("localhost:%s", Integer.valueOf(System.getProperty("expansionPort"))); - expansionJar = System.getProperty("expansionJar"); - } + @BeforeClass + public static void setUpClass() { + expansionAddr = + String.format("localhost:%s", Integer.valueOf(System.getProperty("expansionPort"))); + expansionJar = System.getProperty("expansionJar"); + } - @Before - public void setUp() { - ExperimentalOptions.addExperiment( - testPipeline.getOptions().as(ExperimentalOptions.class), "jar_packages=" + expansionJar); - waitForReady(); - } + @Before + public void setUp() { + ExperimentalOptions.addExperiment( + testPipeline.getOptions().as(ExperimentalOptions.class), "jar_packages=" + expansionJar); + waitForReady(); + } - @After - public void tearDown() { - pipelineResult = testPipeline.run(); - pipelineResult.waitUntilFinish(); - assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE)); - } + @After + public void tearDown() { + pipelineResult = testPipeline.run(); + pipelineResult.waitUntilFinish(); + assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE)); + } - private void waitForReady() { - try { - ManagedChannel channel = ManagedChannelBuilder.forTarget(expansionAddr).build(); - ConnectivityState state = channel.getState(true); - for (int retry = 0; retry < 30 && state != ConnectivityState.READY; retry++) { - Thread.sleep(500); - state = channel.getState(true); + private void waitForReady() { + try { + ManagedChannel channel = ManagedChannelBuilder.forTarget(expansionAddr).build(); + ConnectivityState state = channel.getState(true); + for (int retry = 0; retry < 30 && state != ConnectivityState.READY; retry++) { + Thread.sleep(500); + state = channel.getState(true); + } + channel.shutdownNow(); + } catch (InterruptedException e) { + throw new RuntimeException("interrupted."); } - channel.shutdownNow(); - } catch (InterruptedException e) { - throw new RuntimeException("interrupted."); } - } + private byte[] toStringPayloadBytes(String data) throws IOException { + Row configRow = + Row.withSchema(Schema.of(Field.of("data", FieldType.STRING))) + .withFieldValue("data", data) + .build(); + + ByteString.Output outputStream = ByteString.newOutput(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), false)) + .setPayload(outputStream.toByteString()) + .build(); + return payload.toByteArray(); + } + + protected void singleInputOutputTest(Pipeline pipeline) throws IOException { + PCollection<String> col = + pipeline + .apply(Create.of("1", "2", "3")) + .apply(External.of(TEST_PREFIX_URN, toStringPayloadBytes("0"), expansionAddr)); + PAssert.that(col).containsInAnyOrder("01", "02", "03"); + } + + protected void multiInputOutputWithSideInputTest(Pipeline pipeline) { + PCollection<String> main1 = pipeline.apply("createMain1", Create.of("a", "bb")); + PCollection<String> main2 = pipeline.apply("createMain2", Create.of("x", "yy", "zzz")); + PCollection<String> side = pipeline.apply("createSide", Create.of("s")); + PCollectionTuple pTuple = + PCollectionTuple.of("main1", main1) + .and("main2", main2) + .and("side", side) + .apply(External.of(TEST_MULTI_URN, new byte[] {}, expansionAddr).withMultiOutputs()); Review comment: Probably we should simplify the API so that users don't have to keep passing "new byte[] {}" for empty input ? (in a separate PR) ########## File path: .github/PULL_REQUEST_TEMPLATE.md ########## @@ -156,9 +156,15 @@ See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on </a> </td> <td> - <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/"> - <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon"> - </a> + <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_PythonUsingJava_Dataflow/lastCompletedBuild/"> Review comment: To clarify, we are splitting Python SQL and not-SQL tests to different test suites here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 716033) Time Spent: 14h 40m (was: 14.5h) > Enable JavaUsingPython CrossLanguageValidateRunner test for dataflow runner v2 > ------------------------------------------------------------------------------ > > Key: BEAM-13093 > URL: https://issues.apache.org/jira/browse/BEAM-13093 > Project: Beam > Issue Type: Improvement > Components: cross-language, testing > Reporter: Heejong Lee > Assignee: Heejong Lee > Priority: P2 > Time Spent: 14h 40m > Remaining Estimate: 0h > > Enable JavaUsingPython CrossLanguageValidateRunner test for dataflow runner v2 -- This message was sent by Atlassian Jira (v8.20.1#820001)