This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c880b7aa4d [Improve][Connector-V2]Support multi-table sink feature for 
email (#7368)
c880b7aa4d is described below

commit c880b7aa4dd6198d3b6efa1ef0780a2ce9b5ff4d
Author: corgy-w <73771213+corg...@users.noreply.github.com>
AuthorDate: Tue Aug 20 10:36:13 2024 +0800

    [Improve][Connector-V2]Support multi-table sink feature for email (#7368)
---
 docs/en/connector-v2/sink/Email.md                 |  33 +++--
 docs/zh/connector-v2/sink/Email.md                 |  31 +++--
 .../seatunnel/email/config/EmailConfig.java        |  12 +-
 .../seatunnel/email/config/EmailSinkConfig.java    |  46 +++----
 .../connectors/seatunnel/email/sink/EmailSink.java |  38 +++---
 .../seatunnel/email/sink/EmailSinkFactory.java     |  11 ++
 .../seatunnel/email/sink/EmailSinkWriter.java      |  58 ++++----
 .../connector-email-e2e/pom.xml                    |  37 ++++++
 .../e2e/connector/email/EmailWithMultiIT.java      | 146 +++++++++++++++++++++
 .../src/test/resources/fake_to_email.conf          |  63 +++++++++
 .../src/test/resources/fake_to_email_test.conf     |  62 +++++++++
 .../src/test/resources/fake_to_multiemailsink.conf |  83 ++++++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 13 files changed, 520 insertions(+), 101 deletions(-)

diff --git a/docs/en/connector-v2/sink/Email.md 
b/docs/en/connector-v2/sink/Email.md
index f2bca2783d..444be57292 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -14,25 +14,26 @@ The tested email version is 1.5.6.
 
 ## Options
 
-|           name           |  type  | required | default value |
-|--------------------------|--------|----------|---------------|
-| email_from_address       | string | yes      | -             |
-| email_to_address         | string | yes      | -             |
-| email_host               | string | yes      | -             |
-| email_transport_protocol | string | yes      | -             |
-| email_smtp_auth          | string | yes      | -             |
-| email_authorization_code | string | yes      | -             |
-| email_message_headline   | string | yes      | -             |
-| email_message_content    | string | yes      | -             |
-| common-options           |        | no       | -             |
+|           name           |  type   | required | default value |
+|--------------------------|---------|----------|---------------|
+| email_from_address       | string  | yes      | -             |
+| email_to_address         | string  | yes      | -             |
+| email_host               | string  | yes      | -             |
+| email_transport_protocol | string  | yes      | -             |
+| email_smtp_auth          | boolean | yes      | -             |
+| email_smtp_port          | int     | no       | 465           |
+| email_authorization_code | string  | no       | -             |
+| email_message_headline   | string  | yes      | -             |
+| email_message_content    | string  | yes      | -             |
+| common-options           |         | no       | -             |
 
 ### email_from_address [string]
 
-Sender Email Address .
+Sender Email Address.
 
 ### email_to_address [string]
 
-Address to receive mail.
+Address to receive mail, Support multiple email addresses, separated by commas 
(,).
 
 ### email_host [string]
 
@@ -42,10 +43,14 @@ SMTP server to connect to.
 
 The protocol to load the session .
 
-### email_smtp_auth [string]
+### email_smtp_auth [boolean]
 
 Whether to authenticate the customer.
 
+### email_smtp_port [int]
+
+Select port for authentication.
+
 ### email_authorization_code [string]
 
 authorization code,You can obtain the authorization code from the mailbox 
Settings.
diff --git a/docs/zh/connector-v2/sink/Email.md 
b/docs/zh/connector-v2/sink/Email.md
index cc3999c580..a254dc4608 100644
--- a/docs/zh/connector-v2/sink/Email.md
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -16,17 +16,18 @@
 
 ## 选项
 
-|            名称            |   类型   | 是否必须 | 默认值 |
-|--------------------------|--------|------|-----|
-| email_from_address       | string | 是    | -   |
-| email_to_address         | string | 是    | -   |
-| email_host               | string | 是    | -   |
-| email_transport_protocol | string | 是    | -   |
-| email_smtp_auth          | string | 是    | -   |
-| email_authorization_code | string | 是    | -   |
-| email_message_headline   | string | 是    | -   |
-| email_message_content    | string | 是    | -   |
-| common-options           |        | 否    | -   |
+|            名称            |   类型    | 是否必须 | 默认值 |
+|--------------------------|---------|------|-----|
+| email_from_address       | string  | 是    | -   |
+| email_to_address         | string  | 是    | -   |
+| email_host               | string  | 是    | -   |
+| email_transport_protocol | string  | 是    | -   |
+| email_smtp_auth          | boolean | 是    | -   |
+| email_smtp_port          | int     | 否    | 465 |
+| email_authorization_code | string  | 否    | -   |
+| email_message_headline   | string  | 是    | -   |
+| email_message_content    | string  | 是    | -   |
+| common-options           |         | 否    | -   |
 
 ### email_from_address [string]
 
@@ -34,7 +35,7 @@
 
 ### email_to_address [string]
 
-接收邮件的地址
+接收邮件的地址,支持多个邮箱地址,以逗号(,)分隔。
 
 ### email_host [string]
 
@@ -44,10 +45,14 @@
 
 加载会话的协议
 
-### email_smtp_auth [string]
+### email_smtp_auth [boolean]
 
 是否对客户进行认证
 
+### email_smtp_port [int]
+
+选择用于身份验证的端口。
+
 ### email_authorization_code [string]
 
 授权码,您可以从邮箱设置中获取授权码
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
index 03825804d8..406de844df 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.configuration.Options;
 
 public class EmailConfig {
 
+    public static final String CONNECTOR_IDENTITY = "EmailSink";
+
     public static final Option<String> EMAIL_FROM_ADDRESS =
             Options.key("email_from_address")
                     .stringType()
@@ -61,9 +63,15 @@ public class EmailConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The protocol used to send the message");
-    public static final Option<String> EMAIL_SMTP_AUTH =
+    public static final Option<Boolean> EMAIL_SMTP_AUTH =
             Options.key("email_smtp_auth")
-                    .stringType()
+                    .booleanType()
                     .noDefaultValue()
                     .withDescription("Whether to use SMTP authentication");
+
+    public static final Option<Integer> EMAIL_SMTP_PORT =
+            Options.key("email_smtp_port")
+                    .intType()
+                    .defaultValue(465)
+                    .withDescription("Select port for authentication.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
index f876fab37c..8455a2ed2c 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
@@ -17,22 +17,25 @@
 
 package org.apache.seatunnel.connectors.seatunnel.email.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Data;
 import lombok.NonNull;
 
+import java.io.Serializable;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_AUTHORIZATION_CODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_FROM_ADDRESS;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_CONTENT;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_HEADLINE;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_AUTH;
+import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_PORT;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TO_ADDRESS;
 import static 
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TRANSPORT_PROTOCOL;
 
 @Data
-public class EmailSinkConfig {
+public class EmailSinkConfig implements Serializable {
     private String emailFromAddress;
     private String emailToAddress;
     private String emailAuthorizationCode;
@@ -40,32 +43,19 @@ public class EmailSinkConfig {
     private String emailMessageContent;
     private String emailHost;
     private String emailTransportProtocol;
-    private String emailSmtpAuth;
+    private Boolean emailSmtpAuth;
+    private Integer emailSmtpPort;
 
-    public EmailSinkConfig(@NonNull Config pluginConfig) {
-        if (pluginConfig.hasPath(EMAIL_FROM_ADDRESS.key())) {
-            this.emailFromAddress = 
pluginConfig.getString(EMAIL_FROM_ADDRESS.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_TO_ADDRESS.key())) {
-            this.emailToAddress = 
pluginConfig.getString(EMAIL_TO_ADDRESS.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_AUTHORIZATION_CODE.key())) {
-            this.emailAuthorizationCode = 
pluginConfig.getString(EMAIL_AUTHORIZATION_CODE.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_MESSAGE_HEADLINE.key())) {
-            this.emailMessageHeadline = 
pluginConfig.getString(EMAIL_MESSAGE_HEADLINE.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_MESSAGE_CONTENT.key())) {
-            this.emailMessageContent = 
pluginConfig.getString(EMAIL_MESSAGE_CONTENT.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_HOST.key())) {
-            this.emailHost = pluginConfig.getString(EMAIL_HOST.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_TRANSPORT_PROTOCOL.key())) {
-            this.emailTransportProtocol = 
pluginConfig.getString(EMAIL_TRANSPORT_PROTOCOL.key());
-        }
-        if (pluginConfig.hasPath(EMAIL_SMTP_AUTH.key())) {
-            this.emailSmtpAuth = pluginConfig.getString(EMAIL_SMTP_AUTH.key());
-        }
+    public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
+        super();
+        this.emailFromAddress = pluginConfig.get(EMAIL_FROM_ADDRESS);
+        this.emailToAddress = pluginConfig.get(EMAIL_TO_ADDRESS);
+        this.emailAuthorizationCode = 
pluginConfig.get(EMAIL_AUTHORIZATION_CODE);
+        this.emailMessageHeadline = pluginConfig.get(EMAIL_MESSAGE_HEADLINE);
+        this.emailMessageContent = pluginConfig.get(EMAIL_MESSAGE_CONTENT);
+        this.emailHost = pluginConfig.get(EMAIL_HOST);
+        this.emailTransportProtocol = 
pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
+        this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
+        this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index c1b8ffdd37..0a3df90a12 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -17,40 +17,38 @@
 
 package org.apache.seatunnel.connectors.seatunnel.email.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
 
-@AutoService(SeaTunnelSink.class)
-public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
-    private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    private ReadonlyConfig readonlyConfig;
+    private CatalogTable catalogTable;
+    private EmailSinkConfig pluginConfig;
+
+    public EmailSink(ReadonlyConfig config, CatalogTable table) {
+        this.readonlyConfig = config;
+        this.catalogTable = table;
+        this.pluginConfig = new EmailSinkConfig(config);
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+    public EmailSinkWriter createWriter(SinkWriter.Context context) {
         return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
     }
 
     @Override
     public String getPluginName() {
-        return "EmailSink";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) {
-        this.pluginConfig = pluginConfig;
+        return EmailConfig.CONNECTOR_IDENTITY;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
index 243985ff6f..8db3e42cac 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
@@ -18,8 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.email.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -39,6 +43,12 @@ public class EmailSinkFactory implements TableSinkFactory {
         return "EmailSink";
     }
 
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new EmailSink(context.getOptions(), catalogTable);
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
@@ -51,6 +61,7 @@ public class EmailSinkFactory implements TableSinkFactory {
                         EMAIL_AUTHORIZATION_CODE,
                         EMAIL_MESSAGE_HEADLINE,
                         EMAIL_MESSAGE_CONTENT)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index ebcb5f9041..f7fe04c068 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.email.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
@@ -33,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 import javax.activation.DataHandler;
 import javax.activation.DataSource;
 import javax.activation.FileDataSource;
+import javax.mail.Address;
 import javax.mail.Authenticator;
 import javax.mail.BodyPart;
 import javax.mail.Message;
@@ -51,15 +51,16 @@ import java.io.IOException;
 import java.util.Properties;
 
 @Slf4j
-public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    private EmailSinkConfig config;
+    private final EmailSinkConfig config;
     private StringBuffer stringBuffer;
 
-    public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, Config 
pluginConfig) {
+    public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig 
pluginConfig) {
         this.seaTunnelRowType = seaTunnelRowType;
-        this.config = new EmailSinkConfig(pluginConfig);
+        this.config = pluginConfig;
         this.stringBuffer = new StringBuffer();
     }
 
@@ -78,29 +79,32 @@ public class EmailSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     public void close() {
         createFile();
         Properties properties = new Properties();
-
         properties.setProperty("mail.host", config.getEmailHost());
-
         properties.setProperty("mail.transport.protocol", 
config.getEmailTransportProtocol());
-
-        properties.setProperty("mail.smtp.auth", config.getEmailSmtpAuth());
+        properties.setProperty("mail.smtp.auth", 
config.getEmailSmtpAuth().toString());
+        properties.setProperty("mail.smtp.port", 
config.getEmailSmtpPort().toString());
 
         try {
             MailSSLSocketFactory sf = new MailSSLSocketFactory();
             sf.setTrustAllHosts(true);
-            properties.put("mail.smtp.ssl.enable", "true");
             properties.put("mail.smtp.ssl.socketFactory", sf);
-            Session session =
-                    Session.getDefaultInstance(
-                            properties,
-                            new Authenticator() {
-                                @Override
-                                protected PasswordAuthentication 
getPasswordAuthentication() {
-                                    return new PasswordAuthentication(
-                                            config.getEmailFromAddress(),
-                                            
config.getEmailAuthorizationCode());
-                                }
-                            });
+            Session session;
+            if (config.getEmailSmtpAuth()) {
+                properties.put("mail.smtp.ssl.enable", "true");
+                session =
+                        Session.getDefaultInstance(
+                                properties,
+                                new Authenticator() {
+                                    @Override
+                                    protected PasswordAuthentication 
getPasswordAuthentication() {
+                                        return new PasswordAuthentication(
+                                                config.getEmailFromAddress(),
+                                                
config.getEmailAuthorizationCode());
+                                    }
+                                });
+            } else {
+                session = Session.getDefaultInstance(properties);
+            }
             // Create the default MimeMessage object
             MimeMessage message = new MimeMessage(session);
 
@@ -108,8 +112,14 @@ public class EmailSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
             message.setFrom(new InternetAddress(config.getEmailFromAddress()));
 
             // Set the recipient email address
-            message.addRecipient(
-                    Message.RecipientType.TO, new 
InternetAddress(config.getEmailToAddress()));
+            String[] emailAddresses = config.getEmailToAddress().split(",");
+            Address[] addresses = new Address[emailAddresses.length];
+            for (int i = 0; i < emailAddresses.length; i++) {
+                addresses[i] = new InternetAddress(emailAddresses[i]);
+            }
+            if (addresses.length > 0) {
+                message.setRecipients(Message.RecipientType.TO, addresses);
+            }
 
             // Setting the Email subject
             message.setSubject(config.getEmailMessageHeadline());
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml
new file mode 100644
index 0000000000..7a6552989a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-email-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Email</name>
+
+    <dependencies>
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-email</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
new file mode 100644
index 0000000000..2822231874
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.email;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.mail.Flags;
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.Session;
+import javax.mail.Store;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+@Slf4j
+public class EmailWithMultiIT extends TestSuiteBase implements TestResource {
+    private static final String IMAGE = "greenmail/standalone";
+    private static final String HOST = "email-e2e";
+    private static final int STMP_PORT = 3025;
+    private static final int IMAP_PORT = 3143;
+
+    private GenericContainer<?> smtpContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        this.smtpContainer =
+                new GenericContainer<>(DockerImageName.parse(IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withExposedPorts(STMP_PORT, IMAP_PORT)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(LoggerFactory.getLogger("email-service")));
+        Startables.deepStart(Stream.of(smtpContainer)).join();
+        log.info("SMTP container started");
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (smtpContainer != null) {
+            smtpContainer.stop();
+        }
+    }
+
+    @TestTemplate
+    public void testEmailSink(TestContainer container) throws Exception {
+        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_email.conf");
+        testEMailSuccess(1, "receive...@example.com", 
"receive...@example.com");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK},
+            disabledReason = "Currently FLINK do not support multi-table")
+    public void testMultipleTableEmailSink(TestContainer container) throws 
Exception {
+        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_multiemailsink.conf");
+        testEMailSuccess(2, "receive...@example.com", 
"receive...@example.com");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+    }
+
+    private Session setupImap() {
+        log.info("in setupImap");
+        Properties props = new Properties();
+        props.setProperty("mail.store.protocol", "imap");
+        props.put("mail.imap.host", smtpContainer.getHost());
+        props.put("mail.imap.port", smtpContainer.getMappedPort(IMAP_PORT));
+        props.put("mail.imap.localaddress", smtpContainer.getHost());
+        return Session.getInstance(props, null);
+    }
+
+    private void testEMailSuccess(int receivedNum, String... users) throws 
Exception {
+        Session sessionIMAP = setupImap();
+        for (String user : users) {
+            Store store = sessionIMAP.getStore("imap");
+            store.connect(
+                    smtpContainer.getHost(), 
smtpContainer.getMappedPort(IMAP_PORT), user, "");
+            if (store.isConnected()) {
+                log.info("IMAP is connected");
+                Folder folder = store.getFolder("INBOX");
+                if (folder != null) {
+                    // Open the folder in read/write mode
+                    folder.open(Folder.READ_WRITE);
+
+                    Message[] messages = folder.getMessages();
+                    int unreadCount = 0;
+
+                    for (Message message : messages) {
+                        // Process only unread mail
+                        if (!message.isSet(Flags.Flag.SEEN)) {
+                            unreadCount++;
+                            // Mark as read
+                            message.setFlag(Flags.Flag.SEEN, true);
+                        }
+                    }
+
+                    log.info("mail messages.length: {}", unreadCount);
+                    Assertions.assertEquals(receivedNum, unreadCount);
+                }
+            } else {
+                log.info("IMAP is not connected");
+            }
+        }
+    }
+
+    @Disabled("Email authentication address and authentication information 
need to be configured")
+    public void testOwnEmailSink(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult textReadResult = 
container.executeJob("/fake_to_email_test.conf");
+        Assertions.assertEquals(0, textReadResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
new file mode 100644
index 0000000000..d69b83fd28
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 100
+        schema = {
+          table = "test.table1"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  EmailSink {
+    email_from_address = "sen...@example.com"
+    email_to_address = "receive...@example.com,receive...@example.com"
+    email_host = "email-e2e"
+    email_transport_protocol = "smtp"
+    email_smtp_auth = "false"
+    email_smtp_port = 3025
+    email_authorization_code=""
+    email_message_headline = "test-title"
+    email_message_content = "test-content"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
new file mode 100644
index 0000000000..b3657cd7a7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 100
+        schema = {
+          table = "test.table1"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+    EmailSink {
+      email_from_address = "xxxxx...@qq.com"
+      email_to_address = "xxxxxx...@qq.com"
+      email_host="smtp.qq.com"
+      email_transport_protocol="smtp"
+      email_smtp_auth="true"
+      email_authorization_code="you authorization code"
+      email_message_headline="test-title"
+      email_message_content="test-content"
+   }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
new file mode 100644
index 0000000000..974ad1cbb2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+      {
+        row.num = 100
+        schema = {
+          table = "test.table1"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 100
+        schema = {
+          table = "test.table2"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  EmailSink {
+    email_from_address = "sen...@example.com"
+    email_to_address = "receive...@example.com,receive...@example.com"
+    email_host = "email-e2e"
+    email_transport_protocol = "smtp"
+    email_smtp_auth = false
+    email_smtp_port = 3025
+    email_authorization_code=""
+    email_message_headline = "test-title"
+    email_message_content = "test-content"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index ed36310474..4933ab0205 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -77,6 +77,7 @@
         <module>connector-milvus-e2e</module>
         <module>connector-activemq-e2e</module>
         <module>connector-sls-e2e</module>
+        <module>connector-email-e2e</module>
     </modules>
 
     <dependencies>


Reply via email to