[ 
https://issues.apache.org/jira/browse/BEAM-14036?focusedWorklogId=774183&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774183
 ]

ASF GitHub Bot logged work on BEAM-14036:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/22 18:59
            Start Date: 24/May/22 18:59
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17730:
URL: https://github.com/apache/beam/pull/17730#discussion_r880840405


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam

Review Comment:
   Probably also add a "@Experimental" tag ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+  }
+
+  /** The expected schema of the Pub/Sub message. */
+  public abstract Schema getDataSchema();
+
+  /**
+   * The Pub/Sub topic path to write failures.
+   *
+   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the dead
+   * letter queue topic string.
+   */
+  @Nullable
+  public abstract String getDeadLetterQueue();
+
+  /**
+   * The expected format of the Pub/Sub message.
+   *
+   * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
+   * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
+   */
+  @Nullable
+  public abstract String getFormat();

Review Comment:
   I don't see format, protoClass, thriftClass attributes in the original Read 
config. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)

Review Comment:
   Please add correct nullable tags instead of supressing.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+  }
+
+  /** The expected schema of the Pub/Sub message. */
+  public abstract Schema getDataSchema();
+
+  /**
+   * The Pub/Sub topic path to write failures.
+   *
+   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the dead

Review Comment:
   Probably we should refer to PubSubIO.Read.withDeadLetterTopic() instead.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+  }
+
+  /** The expected schema of the Pub/Sub message. */
+  public abstract Schema getDataSchema();
+
+  /**
+   * The Pub/Sub topic path to write failures.
+   *
+   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the dead
+   * letter queue topic string.
+   */
+  @Nullable
+  public abstract String getDeadLetterQueue();

Review Comment:
   Pls change to getDeadLetterTopic to be consistent with the Pub/Sub read 
transform.
   
https://github.com/apache/beam/blob/d9436c41b3235a08bf8da693a562599b08039bf8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L865



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+  }
+
+  /** The expected schema of the Pub/Sub message. */
+  public abstract Schema getDataSchema();
+
+  /**
+   * The Pub/Sub topic path to write failures.
+   *
+   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the dead
+   * letter queue topic string.
+   */
+  @Nullable
+  public abstract String getDeadLetterQueue();
+
+  /**
+   * The expected format of the Pub/Sub message.
+   *
+   * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
+   * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
+   */
+  @Nullable
+  public abstract String getFormat();
+
+  /** Used by the ProtoPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
+  @Nullable
+  public abstract String getProtoClass();
+
+  /**
+   * The subscription from which to read Pub/Sub messages.
+   *
+   * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more 
details on the format of
+   * the subscription string.
+   */
+  @Nullable
+  public abstract String getSubscription();
+
+  /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
+  @Nullable
+  public abstract String getThriftClass();
+
+  /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
+  @Nullable
+  public abstract String getThriftProtocolFactoryClass();
+
+  /**
+   * When reading from Cloud Pub/Sub where record timestamps are provided as 
Pub/Sub message
+   * attributes, specifies the name of the attribute that contains the 
timestamp.
+   */
+  @Nullable
+  public abstract String getTimestampAttribute();

Review Comment:
   Also add idAttribute ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774183)
    Time Spent: 1h  (was: 50m)

> Convert Pub/Sub Schema IO to SchemaTransform
> --------------------------------------------
>
>                 Key: BEAM-14036
>                 URL: https://issues.apache.org/jira/browse/BEAM-14036
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Lara Schmidt
>            Assignee: Damon Douglas
>            Priority: P2
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> The output of this task is to refactor 
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java|https://github.com/apache/beam/blob/3c62cc906f487d7cd93430bf35d6ac2cd4cc6d3c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java]
>  to extend 
> [TypedSchemaTransformProvider|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java].
> This task will be separated into *+four+* PRs to focus on Read and Write 
> SchemaTransforms and their respective Configurations.  See 
> [BigQuerySchemaTransformReadProvider|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java]
>  and 
> [BigQuerySchemaTransformWriteConfiguration|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java]
>  as an example design pattern.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to