[ 
https://issues.apache.org/jira/browse/BEAM-10885?focusedWorklogId=489826&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-489826
 ]

ASF GitHub Bot logged work on BEAM-10885:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Sep/20 20:33
            Start Date: 23/Sep/20 20:33
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r493858640



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 
1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 
2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", 
"topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 
1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 
2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline 
pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline 
pipeline);

Review comment:
       Could you instead have a method `protected abstract BeamKafkaTable 
getTable()` that gets overridden by each implementation?
   
   Then I think createRecorderDecoder and createRecorderEncoder can have 
concrete private implementations that are reused, like:
   ```
     @Override
     protected PCollection<Row> createRecorderEncoder(TestPipeline pipeline) {
       BeamKafkaTable table = getTable();
       return pipeline
           .apply(Create.of(ROW1, ROW2))
           .apply(table.getPTransformForInput())
           .apply(table.getPTransformForOutput());
     }
   ```
   
   That way you're testing through the public interface and everything else 
(e.g. `AvroRecorderEncoder`) can be private.
   

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 
1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 
2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", 
"topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 
1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 
2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);

Review comment:
       ```suggestion
     protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
   ```

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -184,24 +185,11 @@ private static Schema genSchema() {
             .build());
   }
 
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+  protected static class String2KvBytes extends DoFn<String, KV<byte[], 
byte[]>>

Review comment:
       It looks like this is only actually used in `BeamCsvTableTest`, let's 
just move it there rather than making it protected.

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java
##########
@@ -44,23 +48,25 @@
 import org.apache.kafka.common.record.TimestampType;
 
 /** This is a MockKafkaCSVTestTable. It will use a Mock Consumer. */

Review comment:
       ```suggestion
   /** This is a mock BeamKafkaTable. It will use a Mock Consumer. */
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       The last is probably the most natural for Beam

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -123,57 +144,37 @@ public void testAllLate() {
 
   @Test
   public void testEmptyPartitionsRate() {
-    KafkaCSVTestTable table = getTable(3);
+    KafkaTestTable table = getTable(3);
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
   @Test
   public void allTheRecordsSameTimeRate() {
-    KafkaCSVTestTable table = getTable(3);
-    for (int i = 0; i < 100; i++) {
-      table.addRecord(KafkaTestRecord.create("key" + i, i + ",1,2", "topic1", 
1000));
+    KafkaTestTable table = getTable(3);
+    for (long i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, ImmutableList.of(i, 1, 
2d), 1000L));
     }
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
-  private static class PrintDoFn extends DoFn<Row, Row> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      System.out.println("we are here");
-      System.out.println(c.element().getValues());
-    }
-  }
-
   @Test
-  public void testCsvRecorderDecoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
-            .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderDecoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecorderEncoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), 
CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
-
+  public void testRecorderEncoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
-
     pipeline.run();
   }
 
-  private static Schema genSchema() {
+  protected static Schema genSchema() {

Review comment:
       Please test more types here too. Also I'm not sure why this is using 
JavaTypeFactory and converting to a Beam Schema, maybe it pre-dates the modern 
Schema class that has a nice builder interface. Could you change it to use 
`Schema.builder()...` or `Schema.of(..)`?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       Both of these methods will repeat the schema conversion for every Row. 
Instead both of these methods should convert the schema once and re-use 
`avroSchema` and `coder` for every instance of `bytes` or `row` that comes in. 
There are a few ways to do this, I don't really have a preference for which:
   - Return a lambda with `avroSchema` and `coder` in it's closure, like 
   ```java
   return (bytes) -> {
     ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
     GenericRecord record = coder.decode(inputStream);
     AvroUtils.toBeamRowStruct(record, schema);
   }
   ```
   - Create an AvroBytesToRow class with a process method that re-uses 
avroSchema/coder
   - Similarly, create an AvroBytesToRow DoFn with a process method that 
re-uses avroSchema/coder

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
##########
@@ -67,19 +67,25 @@
 import org.testcontainers.containers.KafkaContainer;
 
 /** This is an integration test for KafkaCSVTable. */
-public class KafkaCSVTableIT {
+public abstract class KafkaTableProviderIT {
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
   @Rule public transient KafkaContainer kafka = new KafkaContainer();
 
-  private KafkaOptions kafkaOptions;
+  protected KafkaOptions kafkaOptions;
 
-  private static final Schema TEST_TABLE_SCHEMA =
+  protected static final Schema TEST_TABLE_SCHEMA =
       Schema.builder()
           .addNullableField("order_id", Schema.FieldType.INT32)
           .addNullableField("member_id", Schema.FieldType.INT32)
           .addNullableField("item_name", Schema.FieldType.INT32)

Review comment:
       Could you add more fields to this schema to exercise more types?

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 
1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 
2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", 
"topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 
1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 
2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline 
pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline 
pipeline);

Review comment:
       In that case it may make sense to get rid of `createRecorderDecoder` and 
`createRecorderEncoder` altogether and inline them in the couple of tests that 
use them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 489826)
    Time Spent: 1h 40m  (was: 1.5h)

> Add Avro support to Kafka Table Provider
> ----------------------------------------
>
>                 Key: BEAM-10885
>                 URL: https://issues.apache.org/jira/browse/BEAM-10885
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>    Affects Versions: 2.25.0
>            Reporter: Piotr Szuberski
>            Assignee: Piotr Szuberski
>            Priority: P2
>             Fix For: 2.25.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to