[ 
https://issues.apache.org/jira/browse/BEAM-13732?focusedWorklogId=717481&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-717481
 ]

ASF GitHub Bot logged work on BEAM-13732:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jan/22 04:21
            Start Date: 29/Jan/22 04:21
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #16598:
URL: https://github.com/apache/beam/pull/16598#discussion_r794996266



##########
File path: sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package bigqueryio contains cross-language functionality for using Google 
Cloud BigQuery
+// (https://cloud.google.com/bigquery). These transforms only work on runners 
that support
+// cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam 
SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+// * Java
+//    - Vendored Module: beam-sdks-java-extensions-schemaio-expansion-service
+//    - Run via Gradle: ./gradlew 
:sdks:java:extensions:schemaio-expansion-service:runExpansionService
+//    - Reference Class: 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider and
+//      org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+//
+// Type Conversions
+//
+// Elements are read from and written to BigQuery by first converting to a 
Beam schema Row type
+// before converting to BigQuery compatible types. The following table lists 
all BigQuery types
+// currently supported, and how they convert to Beam schema and Go types.
+//   +----------------------------+------------------+-----------------+
+//   | BigQuery Standard SQL Type | Beam Schema Type |     Go Type     |
+//   +----------------------------+------------------+-----------------+
+//   | BOOLEAN                    | BOOLEAN          | bool            |
+//   | INT64                      | INT64            | int64           |
+//   | FLOAT64                    | DOUBLE           | float64         |
+//   | BYTES                      | BYTES            | []byte          |
+//   | STRING                     | STRING           | string          |
+//   | ARRAY                      | ARRAY            | Special: slice  |
+//   | STRUCT                     | ROW              | Special: struct |
+//   +----------------------------+------------------+-----------------+
+//
+// Array types are inferred from slice fields. For example, []int64 is 
equivalent to BigQuery's
+// ARRAY<INT64>. Struct types are inferred from nested structs in Go.
+//
+// Additionally, BigQuery schema fields can have a mode assigned to specify 
whether the field is
+// Nullable, Required, or Repeated. In Go, Nullable fields are represented as 
pointers, whereas
+// Required fields are value types. Repeated fields are represented as slices 
in Go (and ARRAYS
+// in SQL, as in the table above).
+//
+// Example of BigQuery fields with modes:
+//   field1 *int64   // Nullable INT64
+//   field2 int64    // Required INT64
+//   field3 []int64  // Repeated INT64
+//
+// Note On Documentation
+//
+// This cross-language implementation relies on the behavior of external SDKs. 
In order to keep
+// documentation up-to-date and consistent, BigQuery functionality will not be 
described in detail
+// in this package. Instead, references to relevant documentation in other 
SDKs is included where
+// relevant.
+package bigqueryio
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/schemaio"
+)
+
+type createDisposition string
+
+const (
+       // CreateNever specifies that new tables should not be created when 
writing to BigQuery.
+       CreateNever createDisposition = "Never"
+
+       // CreateIfNeeded specifies that tables should be created when writing 
to BigQuery, if needed.
+       CreateIfNeeded createDisposition = "IfNeeded"
+
+       readURN  = "beam:transform:org.apache.beam:schemaio_bigquery_read:v1"
+       writeURN = "beam:transform:org.apache.beam:schemaio_bigquery_write:v1"
+)
+
+// bigQueryConfig is a struct meant to match the Schema IO config for Java's 
BigQuery IO. This is
+// used for both reads and writes, and is meant to match the schema defined in 
the Java SDK method
+// 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider.configurationSchema().
+type bigQueryConfig struct {
+       Table             *string `beam:"table"`
+       Query             *string `beam:"query"`
+       QueryLocation     *string `beam:"queryLocation"`
+       CreateDisposition *string `beam:"createDisposition"`
+}
+
+// Read is a cross-language PTRansform which reads from a BigQuery table and 
returns a PCollection
+// of the given type, which should correspond to the Schema type generated by 
reading from the
+// table.
+//
+// Read requires the address for an expansion service for BigQuery Read 
transforms and a
+// reflect.Type description of the struct to read from BigQuery. Additionally, 
either one Table
+// or one Query must be provided via readOptions to define a destination to 
read from.
+//
+// Read accepts additional parameters as readOptions. All optional parameters 
are predefined in this
+// package as functions that return readOption. To set an additional 
parameter, call the function
+// within Read's function signature.
+//
+// Example:
+//   expansionAddr := "localhost:1234"
+//   table := "project_id:dataset_id.table_id"
+//   outType := reflect.TypeOf((*Foo)(nil)).Elem()
+//   pcol := bigqueryio.Read(s, expansionAddr, outType, 
bigqueryio.FromTable(table))
+func Read(s beam.Scope, addr string, elmT reflect.Type, opts ...readOption) 
beam.PCollection {

Review comment:
       Nit, have the expansion address happen as an optional parameter, so we 
can do the auto expansion start by default later. This means we have the 
documentation show the use of the option, and until we have the auto-start in, 
we have the code require it. Just not by a compile time restriction.

##########
File path: sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package schemaio contains utilities for constructing cross-language IO 
wrappers meant to
+// interface with the Java SDK's Schema IOs. Schema IO is an interface for any 
IO that operates on
+// Beam schema supported elements. Various IOs are implemented via Schema IO, 
and each
+// implementation requires its own IO wrapper in the Go SDK (for example, JDBC 
IO or BigQuery IO),
+// and those IO wrappers can make use of these utilities.
+//
+// For implementation details of Schema IO see 
https://s.apache.org/schemaio-development-guide.
+package schemaio
+
+import (
+       "bytes"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+)
+
+// Payload is a struct matching the expected cross-language payload of a 
Schema IO.
+//
+// This documentation describes the expected usage of each field, but 
individual IO implementations
+// are free to use this payload differently. For implementation details of 
those IOs, refer to
+// SchemaIOProvider implementations in the Java SDK.
+type Payload struct {
+       // Location specifies the location to find the data (for example, a URL 
to a database).
+       Location string `beam:"location"`
+
+       // Config is a Beam schema encoded struct containing configuration 
details specific to the
+       // underlying IO implementation.
+       Config []byte `beam:"config"`
+
+       // DataSchema is an optional Beam schema encoded struct representing 
the schema for data being
+       // read or written.
+       DataSchema *[]byte `beam:"dataSchema"`
+}
+
+// EncodeAsRow encodes a struct as a Beam schema Row, to embed within a cross 
language payload.
+func EncodeAsRow(config interface{}) []byte {
+       rt := reflect.TypeOf(config)
+       enc, err := coder.RowEncoderForStruct(rt)
+       if err != nil {
+               err = errors.WithContextf(err, "getting Row encoder for type 
%s", rt.Name())
+               panic(err)
+       }
+       var buf bytes.Buffer
+       if err := enc(config, &buf); err != nil {
+               err = errors.WithContextf(err, "encoding type %s as Row", 
rt.Name())
+               panic(err)
+       }
+       return buf.Bytes()
+}

Review comment:
       Consider having this version of the function return the errors, and have 
a 2nd function MustEncodeAsRow call EncodeAsRow, but panics when a non-nil 
error is received. This gives users a choice, rather than forcing a call that 
can only panic when something goes wrong.

##########
File path: sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package schemaio contains utilities for constructing cross-language IO 
wrappers meant to
+// interface with the Java SDK's Schema IOs. Schema IO is an interface for any 
IO that operates on
+// Beam schema supported elements. Various IOs are implemented via Schema IO, 
and each
+// implementation requires its own IO wrapper in the Go SDK (for example, JDBC 
IO or BigQuery IO),
+// and those IO wrappers can make use of these utilities.
+//
+// For implementation details of Schema IO see 
https://s.apache.org/schemaio-development-guide.
+package schemaio
+
+import (
+       "bytes"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+)
+
+// Payload is a struct matching the expected cross-language payload of a 
Schema IO.
+//
+// This documentation describes the expected usage of each field, but 
individual IO implementations
+// are free to use this payload differently. For implementation details of 
those IOs, refer to
+// SchemaIOProvider implementations in the Java SDK.
+type Payload struct {
+       // Location specifies the location to find the data (for example, a URL 
to a database).
+       Location string `beam:"location"`
+
+       // Config is a Beam schema encoded struct containing configuration 
details specific to the
+       // underlying IO implementation.
+       Config []byte `beam:"config"`
+
+       // DataSchema is an optional Beam schema encoded struct representing 
the schema for data being
+       // read or written.
+       DataSchema *[]byte `beam:"dataSchema"`
+}
+
+// EncodeAsRow encodes a struct as a Beam schema Row, to embed within a cross 
language payload.
+func EncodeAsRow(config interface{}) []byte {

Review comment:
       Unit tests please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 717481)
    Time Spent: 1h  (was: 50m)

> [Cross-Language] Implement Go SDK wrapper for xlang BigQuery IO
> ---------------------------------------------------------------
>
>                 Key: BEAM-13732
>                 URL: https://issues.apache.org/jira/browse/BEAM-13732
>             Project: Beam
>          Issue Type: New Feature
>          Components: cross-language, sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: P2
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Title says it all.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to