[
https://issues.apache.org/jira/browse/BEAM-14036?focusedWorklogId=774183&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774183
]
ASF GitHub Bot logged work on BEAM-14036:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/22 18:59
Start Date: 24/May/22 18:59
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17730:
URL: https://github.com/apache/beam/pull/17730#discussion_r880840405
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
Review Comment:
Probably also add a "@Experimental" tag ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+ /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+ }
+
+ /** The expected schema of the Pub/Sub message. */
+ public abstract Schema getDataSchema();
+
+ /**
+ * The Pub/Sub topic path to write failures.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the dead
+ * letter queue topic string.
+ */
+ @Nullable
+ public abstract String getDeadLetterQueue();
+
+ /**
+ * The expected format of the Pub/Sub message.
+ *
+ * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
+ * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
+ */
+ @Nullable
+ public abstract String getFormat();
Review Comment:
I don't see format, protoClass, thriftClass attributes in the original Read
config.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
Review Comment:
Please add correct nullable tags instead of supressing.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+ /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+ }
+
+ /** The expected schema of the Pub/Sub message. */
+ public abstract Schema getDataSchema();
+
+ /**
+ * The Pub/Sub topic path to write failures.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the dead
Review Comment:
Probably we should refer to PubSubIO.Read.withDeadLetterTopic() instead.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+ /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+ }
+
+ /** The expected schema of the Pub/Sub message. */
+ public abstract Schema getDataSchema();
+
+ /**
+ * The Pub/Sub topic path to write failures.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the dead
+ * letter queue topic string.
+ */
+ @Nullable
+ public abstract String getDeadLetterQueue();
Review Comment:
Pls change to getDeadLetterTopic to be consistent with the Pub/Sub read
transform.
https://github.com/apache/beam/blob/d9436c41b3235a08bf8da693a562599b08039bf8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L865
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from Pub/Sub.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PubsubSchemaTransformReadConfiguration {
+
+ /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+ }
+
+ /** The expected schema of the Pub/Sub message. */
+ public abstract Schema getDataSchema();
+
+ /**
+ * The Pub/Sub topic path to write failures.
+ *
+ * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the dead
+ * letter queue topic string.
+ */
+ @Nullable
+ public abstract String getDeadLetterQueue();
+
+ /**
+ * The expected format of the Pub/Sub message.
+ *
+ * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
+ * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
+ */
+ @Nullable
+ public abstract String getFormat();
+
+ /** Used by the ProtoPayloadSerializerProvider when serializing from a
Pub/Sub message. */
+ @Nullable
+ public abstract String getProtoClass();
+
+ /**
+ * The subscription from which to read Pub/Sub messages.
+ *
+ * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more
details on the format of
+ * the subscription string.
+ */
+ @Nullable
+ public abstract String getSubscription();
+
+ /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
+ @Nullable
+ public abstract String getThriftClass();
+
+ /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
+ @Nullable
+ public abstract String getThriftProtocolFactoryClass();
+
+ /**
+ * When reading from Cloud Pub/Sub where record timestamps are provided as
Pub/Sub message
+ * attributes, specifies the name of the attribute that contains the
timestamp.
+ */
+ @Nullable
+ public abstract String getTimestampAttribute();
Review Comment:
Also add idAttribute ?
Issue Time Tracking
-------------------
Worklog Id: (was: 774183)
Time Spent: 1h (was: 50m)
> Convert Pub/Sub Schema IO to SchemaTransform
> --------------------------------------------
>
> Key: BEAM-14036
> URL: https://issues.apache.org/jira/browse/BEAM-14036
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Lara Schmidt
> Assignee: Damon Douglas
> Priority: P2
> Time Spent: 1h
> Remaining Estimate: 0h
>
> The output of this task is to refactor
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java|https://github.com/apache/beam/blob/3c62cc906f487d7cd93430bf35d6ac2cd4cc6d3c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java]
> to extend
> [TypedSchemaTransformProvider|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java].
> This task will be separated into *+four+* PRs to focus on Read and Write
> SchemaTransforms and their respective Configurations. See
> [BigQuerySchemaTransformReadProvider|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java]
> and
> [BigQuerySchemaTransformWriteConfiguration|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java]
> as an example design pattern.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)