Airblader commented on a change in pull request #460:
URL: https://github.com/apache/flink-web/pull/460#discussion_r703485086



##########
File path: _posts/2021-09-06-connector-table-sql-api-part2.md
##########
@@ -0,0 +1,519 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
Two "
+date: 2021-09-06T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+In [part one](/2021/09/06/connector-table-sql-api-part2) of this tutorial, you 
learned how to build a custom source connector for Flink. In part two, you will 
learn how to integrate the connector with a test email inbox through the IMAP 
protocol, filter out emails, and execute [Flink SQL on the Ververica 
Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform). 
+
+# Goals
+
+Part two of the tutorial will teach you how to: 
+
+- integrate a source connector which connects to a mailbox using the IMAP 
protocol
+- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library 
that can send and receive email via the IMAP protocol  
+- write [Flink 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/)
 and execute the queries in the Ververica Platform
+
+You are encouraged to follow along with the code in this 
[repository](https://github.com/Airblader/blog-imap). It provides a boilerplate 
project that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have:
+
+- followed the steps outlined in [part 
one](/2021/09/06/connector-table-sql-api-part1) of this tutorial
+- some familiarity with Java and objected-oriented programming
+
+
+# Understand how to fetch emails via the IMAP protocol
+
+Now that you have a working source connector that can run on Flink, it is time 
to connect to an email server via 
[IMAP](https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol) (an 
Internet protocol that allows email clients to retrieve messages from a mail 
server) so that Flink can process emails instead of test static data.  
+
+You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java 
library that can be used to send and receive email via IMAP. For simplicity, 
authentication will use a plain username and password.
+
+This tutorial will focus more on how to implement a connector for Flink. If 
you want to learn more about the details of how IMAP or Jakarta Mail work, you 
are encouraged to explore a more extensive implementation at this 
[repository](https://github.com/TNG/flink-connector-email). 
+
+In order to fetch emails, you will need to connect to the email server, 
register a listener for new emails and collect them whenever they arrive, and 
enter a loop to keep the connector running. 
+
+
+# Add configuration options - server information and credentials
+
+In order to connect to your IMAP server, you will need at least the following:
+
+- hostname (of the mail server)
+- port number
+- username
+- password
+
+You will start by creating a class to encapsulate the configuration options. 
You will make use of [Lombok](https://projectlombok.org) to help with some 
boilerplate code. By adding the `@Data` and `@SuperBuilder` annotations, Lombok 
will generate these for all the fields of the immutable class. 
+
+```java
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+@Data
+@SuperBuilder(toBuilder = true)
+public class ImapSourceOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String host;
+    private final @Nullable Integer port;
+    private final @Nullable String user;
+    private final @Nullable String password;
+}
+```
+
+Now you can add an instance of this class to the `ImapSource` and 
`ImapTableSource` classes previously created (in part one) so it can be used 
there. Take note of the column names with which the table has been created. 
This will help later.
+
+
+<div class="note">
+  <h5>Hint</h5>
+  <p>The column names would be "subject" and "content" with the SQL executed 
in part one:</p>
+  <pre><code class="language-sql">CREATE TABLE T (subject STRING, content 
STRING) WITH ('connector' = 'imap');</code></pre>
+</div>
+
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ImapSource extends RichSourceFunction<RowData> {
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapSource(
+        ImapSourceOptions options, 
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames.stream()
+            .map(String::toUpperCase)
+            .collect(Collectors.toList());
+    }
+
+    // ...
+}
+```
+
+```java
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import java.util.List;
+
+public class ImapTableSource implements ScanTableSource {
+
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapTableSource(
+        ImapSourceOptions options,
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames;
+    }
+
+    // …
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+        final ImapSource source = new ImapSource(options, columnNames);
+        return SourceFunctionProvider.of(source, true);

Review comment:
       I guess this is actually wrong here; the source should be unbounded at 
this point. I'll fix it and point it out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to