[ 
https://issues.apache.org/jira/browse/BEAM-13990?focusedWorklogId=763931&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763931
 ]

ASF GitHub Bot logged work on BEAM-13990:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Apr/22 23:31
            Start Date: 28/Apr/22 23:31
    Worklog Time Spent: 10m 
      Work Description: yirutang commented on code in PR #17404:
URL: https://github.com/apache/beam/pull/17404#discussion_r861374699


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -84,10 +143,10 @@ public static class SchemaDoesntMatchException extends 
SchemaConversionException
           .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
           .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON 
encoding.
           .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON 
encoding.
-          .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
+          .put("DATE", Type.TYPE_INT32)
           .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
           .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.

Review Comment:
   TIME and DATETIME can be Int64 so that the encoding size is smaller. 
Shouldn't block this PR:
   
https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java#L334
   



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("TableRowToStorageApiProtoIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "table_row_to_storage_api_proto_" + System.nanoTime();
+
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new 
TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new 
TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new 
TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new 
TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new 
TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new 
TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new 
TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new 
TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new 
TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new 
TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new 
TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new 
TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRING")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", 
BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set(
+              "arrayValue",
+              Arrays.asList("hello", "goodbye", null)); // null in arrayValue 
should be removed
+
+  private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", 
org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", 
org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+  private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", 
LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))

Review Comment:
   int and double timestamp value test?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("TableRowToStorageApiProtoIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "table_row_to_storage_api_proto_" + System.nanoTime();
+
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new 
TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new 
TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new 
TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new 
TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new 
TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new 
TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new 
TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new 
TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new 
TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new 
TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new 
TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new 
TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRING")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", 
BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set(
+              "arrayValue",
+              Arrays.asList("hello", "goodbye", null)); // null in arrayValue 
should be removed
+
+  private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", 
org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", 
org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+  private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", 
LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))

Review Comment:
   bignumeric test?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -523,14 +523,6 @@ public void testTimePartitioning() throws Exception {
     testTimePartitioning(method);
   }
 
-  @Test
-  public void testTimePartitioningStorageApi() throws Exception {

Review Comment:
   Why remove this?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -218,125 +288,157 @@ private static void fieldDescriptorFromTableField(
   }
 
   @Nullable
+  @SuppressWarnings({"nullness"})
   private static Object messageValueFromFieldValue(
-      FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean 
ignoreUnknownValues)
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      @Nullable Object bqValue,
+      boolean ignoreUnknownValues)
       throws SchemaConversionException {
     if (bqValue == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
       } else if (fieldDescriptor.isRepeated()) {
         return Collections.emptyList();
-      }
-      {
+      } else {
         throw new IllegalArgumentException(
             "Received null value for non-nullable field " + 
fieldDescriptor.getName());
       }
     }
-    return toProtoValue(
-        fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), 
ignoreUnknownValues);
-  }
-
-  private static final Map<FieldDescriptor.Type, Function<String, Object>>
-      JSON_PROTO_STRING_PARSERS =
-          ImmutableMap.<FieldDescriptor.Type, Function<String, 
Object>>builder()
-              .put(FieldDescriptor.Type.INT32, Integer::valueOf)
-              .put(FieldDescriptor.Type.INT64, Long::valueOf)
-              .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
-              .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
-              .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
-              .put(FieldDescriptor.Type.STRING, str -> str)
-              .put(
-                  FieldDescriptor.Type.BYTES,
-                  b64 -> 
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
-              .build();
 
-  @Nullable
-  @SuppressWarnings({"nullness"})
-  @VisibleForTesting
-  static Object toProtoValue(
-      FieldDescriptor fieldDescriptor,
-      Object jsonBQValue,
-      boolean isRepeated,
-      boolean ignoreUnknownValues)
-      throws SchemaConversionException {
-    if (isRepeated) {
-      List<Object> listValue = (List<Object>) jsonBQValue;
+    if (fieldDescriptor.isRepeated()) {
+      List<Object> listValue = (List<Object>) bqValue;
       List<Object> protoList = 
Lists.newArrayListWithCapacity(listValue.size());
-      for (Object o : listValue) {
-        protoList.add(toProtoValue(fieldDescriptor, o, false, 
ignoreUnknownValues));
+      for (@Nullable Object o : listValue) {
+        if (o != null) { // repeated field cannot contain null.
+          protoList.add(
+              singularFieldToProtoValue(
+                  schemaInformation, fieldDescriptor, o, ignoreUnknownValues));
+        }
       }
       return protoList;
     }
-
-    if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
-      if (jsonBQValue instanceof TableRow) {
-        TableRow tableRow = (TableRow) jsonBQValue;
-        return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, 
ignoreUnknownValues);
-      } else if (jsonBQValue instanceof AbstractMap) {
-        // This will handle nested rows.
-        AbstractMap<String, Object> map = ((AbstractMap<String, Object>) 
jsonBQValue);
-        return messageFromMap(fieldDescriptor.getMessageType(), map, 
ignoreUnknownValues);
-      } else {
-        throw new RuntimeException("Unexpected value " + jsonBQValue + " 
Expected a JSON map.");
-      }
-    }
-    @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, 
jsonBQValue);
-    if (scalarValue == null) {
-      return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, 
ignoreUnknownValues);
-    } else {
-      return scalarValue;
-    }
+    return singularFieldToProtoValue(
+        schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues);
   }
 
   @VisibleForTesting
   @Nullable
-  static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
-    if (jsonBQValue instanceof String) {
-      Function<String, Object> mapper = 
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
-      if (mapper == null) {
-        throw new UnsupportedOperationException(
-            "Converting BigQuery type '"
-                + jsonBQValue.getClass()
-                + "' to '"
-                + fieldDescriptor
-                + "' is not supported");
-      }
-      return mapper.apply((String) jsonBQValue);
-    }
-
-    switch (fieldDescriptor.getType()) {
-      case BOOL:
-        if (jsonBQValue instanceof Boolean) {
-          return jsonBQValue;
+  static Object singularFieldToProtoValue(
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      Object value,
+      boolean ignoreUnknownValues)
+      throws SchemaConversionException {
+    switch (schemaInformation.getType()) {
+      case "INT64":
+      case "INTEGER":
+        if (value instanceof String) {
+          return Long.valueOf((String) value);
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
         }
         break;
-      case BYTES:
+      case "FLOAT64":
+      case "FLOAT":
+        if (value instanceof String) {
+          return Double.valueOf((String) value);
+        } else if (value instanceof Double || value instanceof Float) {
+          return ((Number) value).doubleValue();
+        }
         break;
-      case INT64:
-        if (jsonBQValue instanceof Integer) {
-          return Long.valueOf((Integer) jsonBQValue);
-        } else if (jsonBQValue instanceof Long) {
-          return jsonBQValue;
+      case "BOOLEAN":
+      case "BOOL":
+        if (value instanceof String) {
+          return Boolean.valueOf((String) value);
+        } else if (value instanceof Boolean) {
+          return value;
         }
         break;
-      case INT32:
-        if (jsonBQValue instanceof Integer) {
-          return jsonBQValue;
+      case "BYTES":
+        if (value instanceof String) {
+          return ByteString.copyFrom(BaseEncoding.base64().decode((String) 
value));
+        } else if (value instanceof byte[]) {
+          return ByteString.copyFrom((byte[]) value);
+        } else if (value instanceof ByteString) {
+          return value;
         }
         break;
-      case STRING:
+      case "TIMESTAMP":
+        if (value instanceof String) {
+          try {
+            return ChronoUnit.MICROS.between(Instant.EPOCH, 
Instant.parse((String) value));
+          } catch (DateTimeParseException e) {
+            return ChronoUnit.MICROS.between(
+                Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) 
value)));
+          }
+        } else if (value instanceof Instant) {
+          return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+        } else if (value instanceof org.joda.time.Instant) {
+          // joda instant precision is millisecond
+          return ((org.joda.time.Instant) value).getMillis() * 1000L;
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
+        } else if (value instanceof Double || value instanceof Float) {
+          // assume value represents number of seconds since epoch
+          return BigDecimal.valueOf(((Number) value).doubleValue())
+              .scaleByPowerOfTen(6)
+              .setScale(0, RoundingMode.HALF_UP)
+              .longValue();
+        }
         break;
-      case DOUBLE:
-        if (jsonBQValue instanceof Double) {
-          return jsonBQValue;
-        } else if (jsonBQValue instanceof Float) {
-          return Double.valueOf((Float) jsonBQValue);
+      case "DATE":
+        if (value instanceof String) {
+          return ((Long) LocalDate.parse((String) 
value).toEpochDay()).intValue();
+        } else if (value instanceof LocalDate) {
+          return ((Long) ((LocalDate) value).toEpochDay()).intValue();
+        } else if (value instanceof org.joda.time.LocalDate) {
+          return Days.daysBetween(
+                  org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(),
+                  (org.joda.time.LocalDate) value)
+              .getDays();
+        }
+        break;
+      case "NUMERIC":
+      case "BIGNUMERIC":
+        if (value instanceof String) {
+          return value;
+        } else if (value instanceof BigDecimal) {
+          return ((BigDecimal) value).toPlainString();

Review Comment:
   We do provide encoding here:
   
https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java
   
   So that they can be pass in more efficiently. Not blocking this PR.





Issue Time Tracking
-------------------

            Worklog Id:     (was: 763931)
    Remaining Estimate: 100.5h  (was: 100h 40m)
            Time Spent: 19.5h  (was: 19h 20m)

> BigQueryIO cannot write to DATE and TIMESTAMP columns when using Storage 
> Write API 
> -----------------------------------------------------------------------------------
>
>                 Key: BEAM-13990
>                 URL: https://issues.apache.org/jira/browse/BEAM-13990
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>    Affects Versions: 2.36.0
>            Reporter: Du Liu
>            Assignee: Du Liu
>            Priority: P2
>   Original Estimate: 120h
>          Time Spent: 19.5h
>  Remaining Estimate: 100.5h
>
> when using Storage Write API with BigQueryIO, DATE and TIMESTAMP values are 
> currently converted to String type in protobuf message. This is incorrect, 
> according to storage write api [documentation|#data_type_conversions],] DATE 
> should be converted to int32 and TIMESTAMP should be converted to int64.
> Here's error message: 
> INFO: Stream finished with error 
> com.google.api.gax.rpc.InvalidArgumentException: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched 
> with BigQuery field at D6cbe536b_4dab_4292_8fda_ff2932dded49.datevalue, the 
> proto field type string, BigQuery field type DATE Entity
> I have included an integration test here: 
> [https://github.com/liu-du/beam/commit/b56823d1d213adf6ca5564ce1d244cc4ae8f0816]
>  
> The problem is because DATE and TIMESTAMP are converted to String in protobuf 
> message here: 
> [https://github.com/apache/beam/blob/a78fec72d0d9198eef75144a7bdaf93ada5abf9b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L69]
>  
> Storage Write API reject the request because it's expecting int32/int64 
> values. 
>  
> I've opened a PR here: https://github.com/apache/beam/pull/16926



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to