[ https://issues.apache.org/jira/browse/BEAM-13990?focusedWorklogId=763931&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763931 ]
ASF GitHub Bot logged work on BEAM-13990: ----------------------------------------- Author: ASF GitHub Bot Created on: 28/Apr/22 23:31 Start Date: 28/Apr/22 23:31 Worklog Time Spent: 10m Work Description: yirutang commented on code in PR #17404: URL: https://github.com/apache/beam/pull/17404#discussion_r861374699 ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -84,10 +143,10 @@ public static class SchemaDoesntMatchException extends SchemaConversionException .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding. .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding. .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding. - .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding. + .put("DATE", Type.TYPE_INT32) .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding. .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding. Review Comment: TIME and DATETIME can be Int64 so that the encoding size is smaller. Shouldn't block this PR: https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java#L334 ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +/** Unit tests for {@link TableRowToStorageApiProto}. */ +public class TableRowToStorageApiProtoIT { + + private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class); + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = + "table_row_to_storage_api_proto_" + System.nanoTime(); + + private static final TableSchema BASE_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.<TableFieldSchema>builder() + .add(new TableFieldSchema().setType("STRING").setName("stringValue")) + .add(new TableFieldSchema().setType("BYTES").setName("bytesValue")) + .add(new TableFieldSchema().setType("INT64").setName("int64Value")) + .add(new TableFieldSchema().setType("INTEGER").setName("intValue")) + .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value")) + .add(new TableFieldSchema().setType("FLOAT").setName("floatValue")) + .add(new TableFieldSchema().setType("BOOL").setName("boolValue")) + .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue")) + .add(new TableFieldSchema().setType("TIME").setName("timeValue")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) + .add(new TableFieldSchema().setType("DATE").setName("dateValue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) + .add( + new TableFieldSchema() + .setType("STRING") + .setMode("REPEATED") + .setName("arrayValue")) + .build()); + + private static final TableRow BASE_TABLE_ROW = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", "2.8168") + .set("floatValue", "2.817") + .set("boolValue", "true") + .set("booleanValue", "true") + .set("timestampValue", "1970-01-01T00:00:00.000043Z") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set( + "arrayValue", + Arrays.asList("hello", "goodbye", null)); // null in arrayValue should be removed + + private static final TableRow BASE_TABLE_ROW_JODA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z")) + .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) + .set("arrayValue", ImmutableList.of("hello", "goodbye")); + + private static final TableRow BASE_TABLE_ROW_JAVA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z")) + .set("timeValue", LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) Review Comment: int and double timestamp value test? ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +/** Unit tests for {@link TableRowToStorageApiProto}. */ +public class TableRowToStorageApiProtoIT { + + private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class); + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = + "table_row_to_storage_api_proto_" + System.nanoTime(); + + private static final TableSchema BASE_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.<TableFieldSchema>builder() + .add(new TableFieldSchema().setType("STRING").setName("stringValue")) + .add(new TableFieldSchema().setType("BYTES").setName("bytesValue")) + .add(new TableFieldSchema().setType("INT64").setName("int64Value")) + .add(new TableFieldSchema().setType("INTEGER").setName("intValue")) + .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value")) + .add(new TableFieldSchema().setType("FLOAT").setName("floatValue")) + .add(new TableFieldSchema().setType("BOOL").setName("boolValue")) + .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue")) + .add(new TableFieldSchema().setType("TIME").setName("timeValue")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) + .add(new TableFieldSchema().setType("DATE").setName("dateValue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) + .add( + new TableFieldSchema() + .setType("STRING") + .setMode("REPEATED") + .setName("arrayValue")) + .build()); + + private static final TableRow BASE_TABLE_ROW = + new TableRow() + .set("stringValue", "string") + .set( + "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64Value", "42") + .set("intValue", "43") + .set("float64Value", "2.8168") + .set("floatValue", "2.817") + .set("boolValue", "true") + .set("booleanValue", "true") + .set("timestampValue", "1970-01-01T00:00:00.000043Z") + .set("timeValue", "00:52:07.123456") + .set("datetimeValue", "2019-08-16T00:52:07.123456") + .set("dateValue", "2019-08-16") + .set("numericValue", "23.4") + .set( + "arrayValue", + Arrays.asList("hello", "goodbye", null)); // null in arrayValue should be removed + + private static final TableRow BASE_TABLE_ROW_JODA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z")) + .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) + .set("arrayValue", ImmutableList.of("hello", "goodbye")); + + private static final TableRow BASE_TABLE_ROW_JAVA_TIME = + new TableRow() + .set("stringValue", "string") + .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8)) + .set("int64Value", 42) + .set("intValue", 43) + .set("float64Value", 2.8168f) + .set("floatValue", 2.817f) + .set("boolValue", true) + .set("booleanValue", true) + .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z")) + .set("timeValue", LocalTime.parse("00:52:07.123456")) + .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456")) + .set("dateValue", LocalDate.parse("2019-08-16")) + .set("numericValue", new BigDecimal("23.4")) Review Comment: bignumeric test? ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java: ########## @@ -523,14 +523,6 @@ public void testTimePartitioning() throws Exception { testTimePartitioning(method); } - @Test - public void testTimePartitioningStorageApi() throws Exception { Review Comment: Why remove this? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -218,125 +288,157 @@ private static void fieldDescriptorFromTableField( } @Nullable + @SuppressWarnings({"nullness"}) private static Object messageValueFromFieldValue( - FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean ignoreUnknownValues) + SchemaInformation schemaInformation, + FieldDescriptor fieldDescriptor, + @Nullable Object bqValue, + boolean ignoreUnknownValues) throws SchemaConversionException { if (bqValue == null) { if (fieldDescriptor.isOptional()) { return null; } else if (fieldDescriptor.isRepeated()) { return Collections.emptyList(); - } - { + } else { throw new IllegalArgumentException( "Received null value for non-nullable field " + fieldDescriptor.getName()); } } - return toProtoValue( - fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), ignoreUnknownValues); - } - - private static final Map<FieldDescriptor.Type, Function<String, Object>> - JSON_PROTO_STRING_PARSERS = - ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder() - .put(FieldDescriptor.Type.INT32, Integer::valueOf) - .put(FieldDescriptor.Type.INT64, Long::valueOf) - .put(FieldDescriptor.Type.FLOAT, Float::valueOf) - .put(FieldDescriptor.Type.DOUBLE, Double::valueOf) - .put(FieldDescriptor.Type.BOOL, Boolean::valueOf) - .put(FieldDescriptor.Type.STRING, str -> str) - .put( - FieldDescriptor.Type.BYTES, - b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64))) - .build(); - @Nullable - @SuppressWarnings({"nullness"}) - @VisibleForTesting - static Object toProtoValue( - FieldDescriptor fieldDescriptor, - Object jsonBQValue, - boolean isRepeated, - boolean ignoreUnknownValues) - throws SchemaConversionException { - if (isRepeated) { - List<Object> listValue = (List<Object>) jsonBQValue; + if (fieldDescriptor.isRepeated()) { + List<Object> listValue = (List<Object>) bqValue; List<Object> protoList = Lists.newArrayListWithCapacity(listValue.size()); - for (Object o : listValue) { - protoList.add(toProtoValue(fieldDescriptor, o, false, ignoreUnknownValues)); + for (@Nullable Object o : listValue) { + if (o != null) { // repeated field cannot contain null. + protoList.add( + singularFieldToProtoValue( + schemaInformation, fieldDescriptor, o, ignoreUnknownValues)); + } } return protoList; } - - if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) { - if (jsonBQValue instanceof TableRow) { - TableRow tableRow = (TableRow) jsonBQValue; - return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues); - } else if (jsonBQValue instanceof AbstractMap) { - // This will handle nested rows. - AbstractMap<String, Object> map = ((AbstractMap<String, Object>) jsonBQValue); - return messageFromMap(fieldDescriptor.getMessageType(), map, ignoreUnknownValues); - } else { - throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map."); - } - } - @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue); - if (scalarValue == null) { - return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, ignoreUnknownValues); - } else { - return scalarValue; - } + return singularFieldToProtoValue( + schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues); } @VisibleForTesting @Nullable - static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) { - if (jsonBQValue instanceof String) { - Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType()); - if (mapper == null) { - throw new UnsupportedOperationException( - "Converting BigQuery type '" - + jsonBQValue.getClass() - + "' to '" - + fieldDescriptor - + "' is not supported"); - } - return mapper.apply((String) jsonBQValue); - } - - switch (fieldDescriptor.getType()) { - case BOOL: - if (jsonBQValue instanceof Boolean) { - return jsonBQValue; + static Object singularFieldToProtoValue( + SchemaInformation schemaInformation, + FieldDescriptor fieldDescriptor, + Object value, + boolean ignoreUnknownValues) + throws SchemaConversionException { + switch (schemaInformation.getType()) { + case "INT64": + case "INTEGER": + if (value instanceof String) { + return Long.valueOf((String) value); + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); } break; - case BYTES: + case "FLOAT64": + case "FLOAT": + if (value instanceof String) { + return Double.valueOf((String) value); + } else if (value instanceof Double || value instanceof Float) { + return ((Number) value).doubleValue(); + } break; - case INT64: - if (jsonBQValue instanceof Integer) { - return Long.valueOf((Integer) jsonBQValue); - } else if (jsonBQValue instanceof Long) { - return jsonBQValue; + case "BOOLEAN": + case "BOOL": + if (value instanceof String) { + return Boolean.valueOf((String) value); + } else if (value instanceof Boolean) { + return value; } break; - case INT32: - if (jsonBQValue instanceof Integer) { - return jsonBQValue; + case "BYTES": + if (value instanceof String) { + return ByteString.copyFrom(BaseEncoding.base64().decode((String) value)); + } else if (value instanceof byte[]) { + return ByteString.copyFrom((byte[]) value); + } else if (value instanceof ByteString) { + return value; } break; - case STRING: + case "TIMESTAMP": + if (value instanceof String) { + try { + return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value)); + } catch (DateTimeParseException e) { + return ChronoUnit.MICROS.between( + Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value))); + } + } else if (value instanceof Instant) { + return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value); + } else if (value instanceof org.joda.time.Instant) { + // joda instant precision is millisecond + return ((org.joda.time.Instant) value).getMillis() * 1000L; + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); + } else if (value instanceof Double || value instanceof Float) { + // assume value represents number of seconds since epoch + return BigDecimal.valueOf(((Number) value).doubleValue()) + .scaleByPowerOfTen(6) + .setScale(0, RoundingMode.HALF_UP) + .longValue(); + } break; - case DOUBLE: - if (jsonBQValue instanceof Double) { - return jsonBQValue; - } else if (jsonBQValue instanceof Float) { - return Double.valueOf((Float) jsonBQValue); + case "DATE": + if (value instanceof String) { + return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue(); + } else if (value instanceof LocalDate) { + return ((Long) ((LocalDate) value).toEpochDay()).intValue(); + } else if (value instanceof org.joda.time.LocalDate) { + return Days.daysBetween( + org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(), + (org.joda.time.LocalDate) value) + .getDays(); + } + break; + case "NUMERIC": + case "BIGNUMERIC": + if (value instanceof String) { + return value; + } else if (value instanceof BigDecimal) { + return ((BigDecimal) value).toPlainString(); Review Comment: We do provide encoding here: https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java So that they can be pass in more efficiently. Not blocking this PR. Issue Time Tracking ------------------- Worklog Id: (was: 763931) Remaining Estimate: 100.5h (was: 100h 40m) Time Spent: 19.5h (was: 19h 20m) > BigQueryIO cannot write to DATE and TIMESTAMP columns when using Storage > Write API > ----------------------------------------------------------------------------------- > > Key: BEAM-13990 > URL: https://issues.apache.org/jira/browse/BEAM-13990 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Affects Versions: 2.36.0 > Reporter: Du Liu > Assignee: Du Liu > Priority: P2 > Original Estimate: 120h > Time Spent: 19.5h > Remaining Estimate: 100.5h > > when using Storage Write API with BigQueryIO, DATE and TIMESTAMP values are > currently converted to String type in protobuf message. This is incorrect, > according to storage write api [documentation|#data_type_conversions],] DATE > should be converted to int32 and TIMESTAMP should be converted to int64. > Here's error message: > INFO: Stream finished with error > com.google.api.gax.rpc.InvalidArgumentException: > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched > with BigQuery field at D6cbe536b_4dab_4292_8fda_ff2932dded49.datevalue, the > proto field type string, BigQuery field type DATE Entity > I have included an integration test here: > [https://github.com/liu-du/beam/commit/b56823d1d213adf6ca5564ce1d244cc4ae8f0816] > > The problem is because DATE and TIMESTAMP are converted to String in protobuf > message here: > [https://github.com/apache/beam/blob/a78fec72d0d9198eef75144a7bdaf93ada5abf9b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L69] > > Storage Write API reject the request because it's expecting int32/int64 > values. > > I've opened a PR here: https://github.com/apache/beam/pull/16926 -- This message was sent by Atlassian Jira (v8.20.7#820007)