jeyhunkarimov commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632110151


##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java:
##########
@@ -36,27 +40,50 @@
 import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** A helper class to make managing the testing topics a bit easier. */
 public class PubsubHelper {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
 
+    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
+
+    private ManagedChannel channel;

Review Comment:
   `private final ManagedChannel channel ` ?



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##########
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+    private static final String PROJECT_ID = "test-project";
+
+    private static final String TOPIC_ID = "test-topic";
+
+    private static final String SUBSCRIPTION_ID = "test-subscription";
+
+    private StreamExecutionEnvironment env;
+
+    @Container
+    private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+            new PubSubEmulatorContainer(
+                    
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+    private PubsubHelper pubSubHelper;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);

Review Comment:
   Any specific reason for `parallelism=1`?



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java:
##########
@@ -36,27 +40,50 @@
 import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** A helper class to make managing the testing topics a bit easier. */
 public class PubsubHelper {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
 
+    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
+
+    private ManagedChannel channel;
+
     private TransportChannelProvider channelProvider;
 
     private TopicAdminClient topicClient;
+
     private SubscriptionAdminClient subscriptionAdminClient;
 
+    public PubsubHelper(String endpoint) {
+        channel = 
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+        channelProvider =
+                
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

Review Comment:
   
   `checkNonNull(channel)`
   `checkNonNull(channelProvider)` ?



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##########
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+    private static final String PROJECT_ID = "test-project";
+
+    private static final String TOPIC_ID = "test-topic";
+
+    private static final String SUBSCRIPTION_ID = "test-subscription";
+
+    private StreamExecutionEnvironment env;
+
+    @Container
+    private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+            new PubSubEmulatorContainer(
+                    
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+    private PubsubHelper pubSubHelper;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+        pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+        pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.close();
+    }
+
+    @Test
+    void pubSubSinkV2DeliversRecords() throws Exception {

Review Comment:
   Is it possible to declare a more specific `Exception` that is expected to be 
thrown?



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##########
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+    private static final String PROJECT_ID = "test-project";
+
+    private static final String TOPIC_ID = "test-topic";
+
+    private static final String SUBSCRIPTION_ID = "test-subscription";
+
+    private StreamExecutionEnvironment env;
+
+    @Container
+    private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+            new PubSubEmulatorContainer(
+                    
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+    private PubsubHelper pubSubHelper;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+        pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+        pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.close();
+    }
+
+    @Test
+    void pubSubSinkV2DeliversRecords() throws Exception {
+        String[] elements = new String[] {"test1", "test2", "test3"};
+        DataStreamSource<String> stream =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                new FromElementsGeneratorFunction<>(
+                                        BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+                                elements.length,
+                                TypeInformation.of(String.class)),
+                        WatermarkStrategy.noWatermarks(),
+                        "DataGeneratorSource");
+
+        GcpPublisherConfig gcpPublisherConfig =
+                GcpPublisherConfig.builder()
+                        
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+                        .setTransportChannelProvider(
+                                new TestChannelProvider(
+                                        
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
+                        .build();
+
+        PubSubSinkV2<String> sink =
+                PubSubSinkV2.<String>builder()
+                        .setProjectId(PROJECT_ID)
+                        .setTopicId(TOPIC_ID)
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setGcpPublisherConfig(gcpPublisherConfig)
+                        .setFailOnError(true)
+                        .build();
+
+        stream.sinkTo(sink);
+
+        env.execute("PubSubSinkV2ITTests");
+        List<ReceivedMessage> receivedMessages =
+                pubSubHelper.pullMessages(PROJECT_ID, SUBSCRIPTION_ID, 100);

Review Comment:
   Should `100` magic number also be a constant number defined once?



##########
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java:
##########
@@ -222,4 +243,34 @@ public Publisher createPublisher(String project, String 
topic) throws IOExceptio
                 .setCredentialsProvider(EmulatorCredentialsProvider.create())
                 .build();
     }
+
+    public void close() {
+        if (topicClient != null) {
+            try {
+                topicClient.shutdown();
+                topicClient.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), 
TimeUnit.SECONDS);
+            } catch (Exception e) {

Review Comment:
   Maybe catch a more specific exception first (here and below) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to