morazow commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1631112122
########## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ########## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +public class PubSubSinkV2ITTests extends TestLogger { Review Comment: It could be dropped in Junit5, will also make it consistent ########## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java: ########## @@ -222,4 +240,33 @@ public Publisher createPublisher(String project, String topic) throws IOExceptio .setCredentialsProvider(EmulatorCredentialsProvider.create()) .build(); } + + public void close() { + if (topicClient != null) { + try { + topicClient.shutdown(); + topicClient.awaitTermination(5, TimeUnit.SECONDS); Review Comment: Maybe extract this to default? If possible to Duration type ########## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ########## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +public class PubSubSinkV2ITTests extends TestLogger { Review Comment: ```suggestion class PubSubSinkV2ITTests extends TestLogger { ``` ########## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ########## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +public class PubSubSinkV2ITTests extends TestLogger { + + private static final String PROJECT_ID = "test-project"; + + private static final String TOPIC_ID = "test-topic"; + + private static final String SUBSCRIPTION_ID = "test-subscription"; + + private StreamExecutionEnvironment env; + + @Container + private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = + new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + + private PubsubHelper pubSubHelper; + + @BeforeEach + void setUp() throws IOException { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + + pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); + pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); + } + + @AfterEach + void tearDown() throws IOException { + pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); + pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); + pubSubHelper.close(); + } + + @Test + void pubSubSinkV2DeliversRecords() throws Exception { + String[] elements = new String[] {"test1", "test2", "test3"}; + DataStreamSource<String> stream = + env.fromSource( + new DataGeneratorSource<>( + new FromElementsGeneratorFunction<>( + BasicTypeInfo.STRING_TYPE_INFO, elements), + elements.length, + TypeInformation.of(String.class)), + WatermarkStrategy.noWatermarks(), + "DataGeneratorSource"); + + GcpPublisherConfig gcpPublisherConfig = + GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) + .setTransportChannelProvider( + new TestChannelProvider( + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint())) + .build(); + + PubSubSinkV2<String> sink = + PubSubSinkV2.<String>builder() + .setProjectId(PROJECT_ID) + .setTopicId(TOPIC_ID) + .setSerializationSchema(new SimpleStringSchema()) + .setGcpPublisherConfig(gcpPublisherConfig) + .setFailOnError(true) + .build(); + + stream.sinkTo(sink); + + env.execute("PubSubSinkV2ITTests"); + List<ReceivedMessage> receivedMessages = + pubSubHelper.pullMessages(PROJECT_ID, SUBSCRIPTION_ID, 100); + + Assertions.assertThat(receivedMessages).hasSize(elements.length); Review Comment: ```suggestion Assertions.assertThat(receivedMessages).hasSameSizeAs(elements); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org