Airblader commented on a change in pull request #460: URL: https://github.com/apache/flink-web/pull/460#discussion_r702610547
########## File path: css/flink.css ########## @@ -249,6 +249,19 @@ img.illu {margin:40px auto 60px;display:block;} .committer-avatar { width: 50px; } + +.note { + border-left: 2px solid #ccc; + padding: 4px 0 4px 8px; + margin-bottom: 32px; + color: lightblue; Review comment: This lightblue text on white background is barely readable to me. It also fails accessibility color contrast standards. ![screenshot-2021-09-06_08-10-15](https://user-images.githubusercontent.com/2392216/132168704-05a2ce9b-0189-48b7-9367-c5a929b02d83.png) ########## File path: _posts/2021-09-06-connector-table-sql-api-part1.md ########## @@ -0,0 +1,244 @@ +--- +layout: post +title: "Implementing a Custom Source Connector for Table API and SQL - Part One " +date: 2021-09-06T00:00:00.000Z +authors: +- Ingo Buerk: + name: "Ingo Buerk" +- Daisy Tsang: + name: "Daisy Tsang" +excerpt: +--- + +{% toc %} + +# Introduction + +Apache Flink is a data processing engine that aims to keep [state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/) locally in order to do computations efficiently. However, Flink does not "own" the data but relies on external systems to ingest and persist data. Connecting to external data input (**sources**) and external data storage (**sinks**) is usually summarized under the term **connectors** in Flink. + +Since connectors are such important components, Flink ships with [connectors for some popular systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/). But sometimes you may need to read in an uncommon data format and what Flink provides is not enough. This is why Flink also provides extension points for building custom connectors if you want to connect to a system that is not supported by an existing connector. + +Once you have a source and a sink defined for Flink, you can use its declarative APIs (in the form of the [Table API and SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/)) to execute queries for data analysis. + +The **Table API** provides more programmatic access while **SQL** is a more universal query language. It is named Table API because of its relational functions on tables: how to obtain a table, how to output a table, and how to perform query operations on the table. + +In this two-part tutorial, you will explore some of these APIs and concepts by implementing your own custom source connector for reading in data from an email inbox. You will then use Flink to process emails through the [IMAP protocol](https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol). + +Part one will focus on building a custom source connector and [part two](/2021/09/06/connector-table-sql-api-part2) will focus on integrating it. + +# Goals + +Part one of this tutorial will teach you how to build and run a custom source connector to be used with Table API and SQL, two high-level abstractions in Flink. + +You are encouraged to follow along with the code in this [repository](https://github.com/Airblader/blog-imap). It provides a boilerplate project that also comes with a bundled [docker-compose](https://docs.docker.com/compose/) setup that lets you easily run the connector. You can then try it out with Flink’s SQL client. + + +# Prerequisites + +This tutorial assumes that you have some familiarity with Java and objected-oriented programming. + +It would also be useful to have [docker-compose](https://docs.docker.com/compose/install/) installed on your system in order to use the script included in the repository that builds and runs the connector. + + +# Understand the infrastructure required for a connector + +In order to create a connector which works with Flink, you need: + +1. A _factory class_ (a blueprint for creating other objects from string properties) that tells Flink with which identifier (in this case, “imap”) our connector can be addressed, which configuration options it exposes, and how the connector can be instantiated. Since Flink uses the Java Service Provider Interface (SPI) to discover factories located in different modules, you will also need to add some configuration details. + +2. The _table source_ object as a specific instance of the connector during the planning stage. It is responsible for back and forth communication with the optimizer during the planning stage and is like another factory for creating connector runtime implementation. There are also more advanced features, such as [abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html), that can be implemented to improve connector performance. + +3. A _runtime implementation_ from the connector obtained during the planning stage. The runtime logic is implemented in Flink's core connector interfaces and does the actual work of producing rows of dynamic table data. The runtime instances are shipped to the Flink cluster. + +Let us look at this sequence (factory class → table source → runtime implementation) in reverse order. + +# Establish the runtime implementation of the connector + +You first need to have a source connector which can be used in Flink's runtime system, defining how data goes in and how it can be executed in the cluster. There are a few different interfaces available for implementing the actual source of the data and have it be discoverable in Flink. + +For complex connectors, you may want to implement the [Source interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html) which gives you a lot of control. For simpler use cases, you can use the [SourceFunction interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html). There are already a few different implementations of SourceFunction interfaces for common use cases such as the [FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html) class and the [RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html) class. You will use the latter. + +<div class="note"> + <h5>Hint</h5> + <p>The Source interface is the new abstraction whereas the SourceFunction interface is old. + All connectors will eventually implement the Source interface. + </p> +</div> + +`RichSourceFunction` is a base class for implementing a data source that has access to context information and some lifecycle methods. There is a `run()` method inherited from the `SourceFunction` interface that you need to implement. It is invoked once and can be used to produce the data either once for a bounded result or within a loop for an unbounded stream. + +For example, to create a bounded data source, you could implement this method so that it reads all existing emails and then closes. To create an unbounded source, you could only look at new emails coming in while the source is active. You can also combine these behaviors and expose them through configuration options. + +When you first create the class and implement the interface, it should look something like this: + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + @Override + public void run(SourceContext<RowData> ctx) throws Exception {} + + @Override + public void cancel() {} +} +``` + +Note that internal data structures (`RowData`) are used because that is required by the table runtime. + +In the `run()` method, you get access to a [context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html) object inherited from the SourceFunction interface, which is a bridge to Flink and allows you to output data. Since the source does not produce any data yet, the next step is to make it produce some static data in order to test that the data flows correctly: + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; + +public class ImapSource extends RichSourceFunction<RowData> { + @Override + public void run(SourceContext<RowData> ctx) throws Exception { + ctx.collect(GenericRowData.of( + StringData.fromString("Subject 1"), + StringData.fromString("Hello, World!") + )); + } + + @Override + public void cancel(){} +} +``` + +You do not need to implement the `cancel()` method yet because the source finishes instantly. + +# Create and configure a dynamic table source for the data stream + +[Dynamic tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/) are the core concept of Flink’s Table API and SQL support for streaming data and, like its name suggests, change over time. You can imagine a data stream being logically converted into a table that is constantly changing. For this tutorial, the emails that will be read in will be interpreted as a (source) table that is queryable. It can be viewed as a specific instance of a connector class. + +You will now implement a [DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html) interface. There are two types of dynamic table sources: [ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html) and [LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html). Scan sources read the entire table on the external system while lookup sources look for specific rows based on keys. The former will fit the use case of this tutorial. + +This is what a scan table source implementation would look like: + +```java +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; + +public class ImapTableSource implements ScanTableSource { + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) { + boolean bounded = true; + final ImapSource source = new ImapSource(); + return SourceFunctionProvider.of(source, bounded); + } + + @Override + public DynamicTableSource copy() { + return new ImapTableSource(); + } + + @Override + public String asSummaryString() { + return "IMAP Table Source"; + } +} +``` + +[ChangelogMode](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/ChangelogMode.html) informs Flink of expected changes that the planner can expect during runtime. For example, whether the source produces only new rows, also updates to existing ones, or whether it can remove previously produced rows. Our source will only produce (`insertOnly()`) new rows. + +[ScanRuntimeProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.ScanRuntimeProvider.html) allows Flink to create the actual runtime implementation you established previously (for reading the data). Flink even provides utilities like [SourceFunctionProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/SourceFunctionProvider.html) to wrap it into an instance of [SourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html), which is one of the base runtime interfaces. + +You will also need to indicate whether the source is bounded or not. Currently, this is the case but you will have to change this later. + +# Create a factory class for the connector so it can be discovered by Flink + +You now have a working source connector, but in order to use it in Table API or SQL, it needs to be discoverable by Flink. You also need to define how the connector is addressable from a SQL statement when creating a source table. + +You need to implement a [Factory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/Factory.html), which is a base interface that creates object instances from a list of key-value pairs in Flink's Table API and SQL. A factory is uniquely identified by its class name and `factoryIdentifier()`. For this tutorial, you will implement the more specific [DynamicTableSourceFactory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableSourceFactory.html), which allows you to configure a dynamic table connector as well as create `DynamicTableSource` instances. + +```java +import java.util.HashSet; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + @Override + public String factoryIdentifier() { + return "imap"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>(); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context ctx) { + final FactoryUtil.TableFactoryHelper factoryHelper = FactoryUtil.createTableFactoryHelper(this, ctx); + factoryHelper.validate(); + + return new ImapTableSource(); + } +} +``` + +There are currently no configuration options but they can be added and also validated within the `createDynamicTableSource()` function. There is a small helper utility, [TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html), that Flink offers which ensures that required options are set and that no unknown options are provided. + +Finally, you need to register your factory for Java's Service Provider Interfaces (SPI). Classes that implement this interface can be discovered and should be added to this file `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` with the fully classified class name of your factory: + +```java +// if you created your class in the package org.example.acme, it should be named the following: +org.example.acme.ImapTableSourceFactory Review comment: ```suggestion org.example.acme.ImapSourceFactory ``` ########## File path: _posts/2021-09-06-connector-table-sql-api-part2.md ########## @@ -0,0 +1,518 @@ +--- +layout: post +title: "Implementing a custom source connector for Table API and SQL - Part Two " +date: 2021-09-06T00:00:00.000Z +authors: +- Ingo Buerk: + name: "Ingo Buerk" +- Daisy Tsang: + name: "Daisy Tsang" +excerpt: +--- + +{% toc %} + +# Introduction + +In [part one](/2021/09/06/connector-table-sql-api-part2) of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol, filter out emails, and execute [Flink SQL on the Ververica Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform). + +# Goals + +Part two of the tutorial will teach you how to: + +- integrate a source connector which connects to a mailbox using the IMAP protocol +- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library that can send and receive email via the IMAP protocol +- write [Flink SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/) and execute the queries in the Ververica Platform + +You are encouraged to follow along with the code in this [repository](https://github.com/Airblader/blog-imap). It provides a boilerplate project that also comes with a bundled [docker-compose](https://docs.docker.com/compose/) setup that lets you easily run the connector. You can then try it out with Flink’s SQL client. + + +# Prerequisites + +This tutorial assumes that you have: + +- followed the steps outlined in [part one](/2021/09/06/connector-table-sql-api-part1) of this tutorial +- some familiarity with Java and objected-oriented programming + + +# Understand how to fetch emails via the IMAP protocol + +Now that you have a working source connector that can run on Flink, it is time to connect to an email server via [IMAP](https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol) (an Internet protocol that allows email clients to retrieve messages from a mail server) so that Flink can process emails instead of test static data. + +You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library that can be used to send and receive email via IMAP. For simplicity, authentication will use a plain username and password. + +This tutorial will focus more on how to implement a connector for Flink. If you want to learn more about the details of how IMAP or Jakarta Mail work, you are encouraged to explore a more extensive implementation at this [repository](https://github.com/TNG/flink-connector-email). + +In order to fetch emails, you will need to connect to the email server, register a listener for new emails and collect them whenever they arrive, and enter a loop to keep the connector running. + + +# Add configuration options - server information and credentials + +In order to connect to your IMAP server, you will need at least the following: + +- hostname (of the mail server) +- port number +- username +- password + +You will start by creating a class to encapsulate the configuration options. You will make use of [Lombok](https://projectlombok.org) to help with some boilerplate code. By adding the `@Data` and `@SuperBuilder` annotations, Lombok will generate these for all the fields of the immutable class. + +```java +import lombok.Data; +import lombok.experimental.SuperBuilder; +import javax.annotation.Nullable; +import java.io.Serializable; + +@Data +@SuperBuilder(toBuilder = true) +public class ImapSourceOptions implements Serializable { + private static final long serialVersionUID = 1L; + + private final String host; + private final @Nullable Integer port; + private final @Nullable String user; + private final @Nullable String password; +} +``` + +Now you can add an instance of this class to the `ImapSource` and `ImapTableSource` classes previously created (in part one) so it can be used there. Take note of the column names with which the table has been created. This will help later. + + +<div class="note"> + <h5>Hint</h5> + <p>The column names would be "subject" and "content" with the SQL executed in part one:</p> + <pre><code class="language-sql">CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap');</code></pre> +</div> + + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; +import java.util.List; +import java.util.stream.Collectors; + +public class ImapSource extends RichSourceFunction<RowData> { + private final ImapSourceOptions options; + private final List<String> columnNames; + + public ImapSource( + ImapSourceOptions options, + List<String> columnNames + ) { + this.options = options; + this.columnNames = columnNames.stream() + .map(String::toUpperCase) + .collect(Collectors.toList()); + } + + // ... +} +``` + +```java +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import java.util.List; + +public class ImapTableSource implements ScanTableSource { + + private final ImapSourceOptions options; + private final List<String> columnNames; + + public ImapTableSource( + ImapSourceOptions options, + List<String> columnNames + ) { + this.options = options; + this.columnNames = columnNames; + } + + // … + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) { + final ImapSource source = new ImapSource(options, columnNames); + return SourceFunctionProvider.of(source, true); + } + + @Override + public DynamicTableSource copy() { + return new ImapTableSource(options, columnNames); + } + + // … +} +``` + +Finally, in the `ImapSourceFactory` class, you need to create a `ConfigOption<>` for the hostname, port number, username, and password. Then you need to report them to Flink. Host, user, and password are mandatory and can be added to `requiredOptions()`; the port is optional and can be added to `optionalOptions()` instead. + +```java +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +import java.util.HashSet; +import java.util.Set; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + + public static final ConfigOption<String> HOST = ConfigOptions.key("host").stringType().noDefaultValue(); + public static final ConfigOption<Integer> PORT = ConfigOptions.key("port").intType().noDefaultValue(); + public static final ConfigOption<String> USER = ConfigOptions.key("user").stringType().noDefaultValue(); + public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue(); + + // … + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(HOST); + options.add(USER); + options.add(PASSWORD); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PORT); + return options; + } + // … +} +``` + +Now take a look at the `createDynamicTableSource()` function in the `ImapSourceFactory` class. Recall that previously (in part one) you had created a small helper utility [TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html), that Flink offers which ensures that required options are set and that no unknown options are provided. You can now use it to automatically make sure that the required options of hostname, port number, username, and password are all provided when creating a table using this connector. The helper function will throw an error message if one required option is missing. You can also use it to access the provided options (`getOptions()`), convert them into an instance of the `ImapTableSource` class created earlier, and provide the instance to the table source: + +```java +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.catalog.Column; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + + // ... + + @Override + public DynamicTableSource createDynamicTableSource(Context ctx) { + final FactoryUtil.TableFactoryHelper factoryHelper = FactoryUtil.createTableFactoryHelper(this, ctx); + factoryHelper.validate(); + + final ImapSourceOptions options = ImapSourceOptions.builder() + .host(factoryHelper.getOptions().get(HOST)) + .port(factoryHelper.getOptions().get(PORT)) + .user(factoryHelper.getOptions().get(USER)) + .password(factoryHelper.getOptions().get(PASSWORD)) + .build(); + + final List<String> columnNames = ctx.getCatalogTable().getResolvedSchema().getColumns().stream() + .filter(Column::isPhysical) + .map(Column::getName) + .collect(Collectors.toList()); + + return new ImapTableSource(options, columnNames); + } +} +``` +<div class="note"> + <h5>Hint</h5> + <p> + Ideally, you would use connector <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/#metadata">metadata</a> instead of column names. You can refer again to the accompanying <a href="https://github.com/TNG/flink-connector-email">repository</a> which does implement this using metadata fields. + </p> +</div> + +To test these new configuration options, run: + +```sh +$ cd testing/ +$ ./build_and_run.sh +``` + +Once you see the [Flink SQL client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/) start up, execute the following statements to create a table with your connector: + +```sql +CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap'); + +SELECT * FROM T; +``` + +This time it will fail because the required options are not provided: + +``` +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: One or more required options are missing. + +Missing required options are: + +host +password +user +``` + +# Connect to the source email server + +Now that you have configured the required options to connect to the email server, it is time to actually connect to the server. + +Going back to the `ImapSource` class, you first need to convert the options given to the table source into a [Properties](https://docs.oracle.com/javase/tutorial/essential/environment/properties.html) object, which is what you can pass to the Jakarta library. You can also set various other properties here as well (i.e. enabling SSL). + +The specific properties that the Jakarta library understands are documented [here](https://jakarta.ee/specifications/mail/1.6/apidocs/index.html?com/sun/mail/imap/package-summary.html). + + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; +import java.util.Properties; + +public class ImapSource extends RichSourceFunction<RowData> { + // … + + private Properties getSessionProperties() { + Properties props = new Properties(); + props.put("mail.store.protocol", "imap"); + props.put("mail.imap.auth", true); + props.put("mail.imap.host", options.getHost()); + if (options.getPort() != null) { + props.put("mail.imap.port", options.getPort()); + } + + return props; + } +} +``` + +Now create a method (`connect()`) which sets up the connection: + +```java +import jakarta.mail.*; +import com.sun.mail.imap.IMAPFolder; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + // … + + private transient Store store; + private transient IMAPFolder folder; + + private void connect() throws Exception { + final Session session = Session.getInstance(getSessionProperties(), null); + store = session.getStore(); + store.connect(options.getUser(), options.getPassword()); + + final Folder genericFolder = store.getFolder("INBOX"); + folder = (IMAPFolder) genericFolder; + + if (!folder.isOpen()) { + folder.open(Folder.READ_ONLY); + } + } +} +``` + +You can now use this method to connect to the mail server when the source is created. Create a loop to keep the source running while collecting email counts. Lastly, implement methods to cancel and close the connection: + +```java +import jakarta.mail.*; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + private transient volatile boolean running = false; + + // … + + @Override + public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception { + connect(); + running = true; + + // TODO: Listen for new messages + + while (running) { + // Trigger some IMAP request to force the server to send a notification + folder.getMessageCount(); + Thread.sleep(250); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void close() throws Exception { + if (folder != null) { + folder.close(); + } + + if (store != null) { + store.close(); + } + } +} +``` + +There is a request trigger to the server in every loop iteration. This is crucial as it ensures that the server will keep sending notifications. A more sophisticated approach would be to make use of the IDLE protocol. + +Note that since the source is not checkpointable, no state fault tolerance will be possible. + + +## Collect incoming emails + +Now you need to listen for new emails arriving in the inbox folder and collect them. To begin, hardcode the schema and only return the email’s subject. Fortunately, Jakarta provides a simple hook (`addMessageCountListener()`) to get notified when new messages arrive on the server. You can use this in place of the “TODO” comment above: + +```java +import jakarta.mail.*; +import jakarta.mail.event.MessageCountAdapter; +import jakarta.mail.event.MessageCountEvent; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.GenericRowData; Review comment: ```suggestion import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; ``` ########## File path: _posts/2021-09-06-connector-table-sql-api-part1.md ########## @@ -0,0 +1,244 @@ +--- +layout: post +title: "Implementing a Custom Source Connector for Table API and SQL - Part One " +date: 2021-09-06T00:00:00.000Z +authors: +- Ingo Buerk: + name: "Ingo Buerk" +- Daisy Tsang: + name: "Daisy Tsang" +excerpt: +--- + +{% toc %} + +# Introduction + +Apache Flink is a data processing engine that aims to keep [state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/) locally in order to do computations efficiently. However, Flink does not "own" the data but relies on external systems to ingest and persist data. Connecting to external data input (**sources**) and external data storage (**sinks**) is usually summarized under the term **connectors** in Flink. + +Since connectors are such important components, Flink ships with [connectors for some popular systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/). But sometimes you may need to read in an uncommon data format and what Flink provides is not enough. This is why Flink also provides extension points for building custom connectors if you want to connect to a system that is not supported by an existing connector. + +Once you have a source and a sink defined for Flink, you can use its declarative APIs (in the form of the [Table API and SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/)) to execute queries for data analysis. + +The **Table API** provides more programmatic access while **SQL** is a more universal query language. It is named Table API because of its relational functions on tables: how to obtain a table, how to output a table, and how to perform query operations on the table. + +In this two-part tutorial, you will explore some of these APIs and concepts by implementing your own custom source connector for reading in data from an email inbox. You will then use Flink to process emails through the [IMAP protocol](https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol). + +Part one will focus on building a custom source connector and [part two](/2021/09/06/connector-table-sql-api-part2) will focus on integrating it. + +# Goals + +Part one of this tutorial will teach you how to build and run a custom source connector to be used with Table API and SQL, two high-level abstractions in Flink. + +You are encouraged to follow along with the code in this [repository](https://github.com/Airblader/blog-imap). It provides a boilerplate project that also comes with a bundled [docker-compose](https://docs.docker.com/compose/) setup that lets you easily run the connector. You can then try it out with Flink’s SQL client. + + +# Prerequisites + +This tutorial assumes that you have some familiarity with Java and objected-oriented programming. + +It would also be useful to have [docker-compose](https://docs.docker.com/compose/install/) installed on your system in order to use the script included in the repository that builds and runs the connector. + + +# Understand the infrastructure required for a connector + +In order to create a connector which works with Flink, you need: + +1. A _factory class_ (a blueprint for creating other objects from string properties) that tells Flink with which identifier (in this case, “imap”) our connector can be addressed, which configuration options it exposes, and how the connector can be instantiated. Since Flink uses the Java Service Provider Interface (SPI) to discover factories located in different modules, you will also need to add some configuration details. + +2. The _table source_ object as a specific instance of the connector during the planning stage. It is responsible for back and forth communication with the optimizer during the planning stage and is like another factory for creating connector runtime implementation. There are also more advanced features, such as [abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html), that can be implemented to improve connector performance. + +3. A _runtime implementation_ from the connector obtained during the planning stage. The runtime logic is implemented in Flink's core connector interfaces and does the actual work of producing rows of dynamic table data. The runtime instances are shipped to the Flink cluster. + +Let us look at this sequence (factory class → table source → runtime implementation) in reverse order. + +# Establish the runtime implementation of the connector + +You first need to have a source connector which can be used in Flink's runtime system, defining how data goes in and how it can be executed in the cluster. There are a few different interfaces available for implementing the actual source of the data and have it be discoverable in Flink. + +For complex connectors, you may want to implement the [Source interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html) which gives you a lot of control. For simpler use cases, you can use the [SourceFunction interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html). There are already a few different implementations of SourceFunction interfaces for common use cases such as the [FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html) class and the [RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html) class. You will use the latter. + +<div class="note"> + <h5>Hint</h5> + <p>The Source interface is the new abstraction whereas the SourceFunction interface is old. + All connectors will eventually implement the Source interface. + </p> +</div> + +`RichSourceFunction` is a base class for implementing a data source that has access to context information and some lifecycle methods. There is a `run()` method inherited from the `SourceFunction` interface that you need to implement. It is invoked once and can be used to produce the data either once for a bounded result or within a loop for an unbounded stream. + +For example, to create a bounded data source, you could implement this method so that it reads all existing emails and then closes. To create an unbounded source, you could only look at new emails coming in while the source is active. You can also combine these behaviors and expose them through configuration options. + +When you first create the class and implement the interface, it should look something like this: + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + @Override + public void run(SourceContext<RowData> ctx) throws Exception {} + + @Override + public void cancel() {} +} +``` + +Note that internal data structures (`RowData`) are used because that is required by the table runtime. + +In the `run()` method, you get access to a [context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html) object inherited from the SourceFunction interface, which is a bridge to Flink and allows you to output data. Since the source does not produce any data yet, the next step is to make it produce some static data in order to test that the data flows correctly: + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; + +public class ImapSource extends RichSourceFunction<RowData> { + @Override + public void run(SourceContext<RowData> ctx) throws Exception { + ctx.collect(GenericRowData.of( + StringData.fromString("Subject 1"), + StringData.fromString("Hello, World!") + )); + } + + @Override + public void cancel(){} +} +``` + +You do not need to implement the `cancel()` method yet because the source finishes instantly. + +# Create and configure a dynamic table source for the data stream + +[Dynamic tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/) are the core concept of Flink’s Table API and SQL support for streaming data and, like its name suggests, change over time. You can imagine a data stream being logically converted into a table that is constantly changing. For this tutorial, the emails that will be read in will be interpreted as a (source) table that is queryable. It can be viewed as a specific instance of a connector class. + +You will now implement a [DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html) interface. There are two types of dynamic table sources: [ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html) and [LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html). Scan sources read the entire table on the external system while lookup sources look for specific rows based on keys. The former will fit the use case of this tutorial. + +This is what a scan table source implementation would look like: + +```java +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; + +public class ImapTableSource implements ScanTableSource { + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) { + boolean bounded = true; + final ImapSource source = new ImapSource(); + return SourceFunctionProvider.of(source, bounded); + } + + @Override + public DynamicTableSource copy() { + return new ImapTableSource(); + } + + @Override + public String asSummaryString() { + return "IMAP Table Source"; + } +} +``` + +[ChangelogMode](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/ChangelogMode.html) informs Flink of expected changes that the planner can expect during runtime. For example, whether the source produces only new rows, also updates to existing ones, or whether it can remove previously produced rows. Our source will only produce (`insertOnly()`) new rows. + +[ScanRuntimeProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.ScanRuntimeProvider.html) allows Flink to create the actual runtime implementation you established previously (for reading the data). Flink even provides utilities like [SourceFunctionProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/SourceFunctionProvider.html) to wrap it into an instance of [SourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html), which is one of the base runtime interfaces. + +You will also need to indicate whether the source is bounded or not. Currently, this is the case but you will have to change this later. + +# Create a factory class for the connector so it can be discovered by Flink + +You now have a working source connector, but in order to use it in Table API or SQL, it needs to be discoverable by Flink. You also need to define how the connector is addressable from a SQL statement when creating a source table. + +You need to implement a [Factory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/Factory.html), which is a base interface that creates object instances from a list of key-value pairs in Flink's Table API and SQL. A factory is uniquely identified by its class name and `factoryIdentifier()`. For this tutorial, you will implement the more specific [DynamicTableSourceFactory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableSourceFactory.html), which allows you to configure a dynamic table connector as well as create `DynamicTableSource` instances. + +```java +import java.util.HashSet; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + @Override + public String factoryIdentifier() { + return "imap"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>(); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context ctx) { + final FactoryUtil.TableFactoryHelper factoryHelper = FactoryUtil.createTableFactoryHelper(this, ctx); + factoryHelper.validate(); + + return new ImapTableSource(); + } +} +``` + +There are currently no configuration options but they can be added and also validated within the `createDynamicTableSource()` function. There is a small helper utility, [TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html), that Flink offers which ensures that required options are set and that no unknown options are provided. + +Finally, you need to register your factory for Java's Service Provider Interfaces (SPI). Classes that implement this interface can be discovered and should be added to this file `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` with the fully classified class name of your factory: + +```java +// if you created your class in the package org.example.acme, it should be named the following: +org.example.acme.ImapTableSourceFactory Review comment: However, I'm kind of confused now. In my original Google Doc, `ImapTableSourceFactory` was the name of the class. Somehow in the process of this PR this changed, but at least not intentionally from my side. Technically, `ImapTableSourceFactory` is the more appropriate name since it's a factory for the table source, not the source. I would kind of prefer changing `ImapSourceFactory` to `ImapTableSourceFactory` everywhere again? ########## File path: _posts/2021-09-06-connector-table-sql-api-part2.md ########## @@ -0,0 +1,518 @@ +--- +layout: post +title: "Implementing a custom source connector for Table API and SQL - Part Two " +date: 2021-09-06T00:00:00.000Z +authors: +- Ingo Buerk: + name: "Ingo Buerk" +- Daisy Tsang: + name: "Daisy Tsang" +excerpt: +--- + +{% toc %} + +# Introduction + +In [part one](/2021/09/06/connector-table-sql-api-part2) of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol, filter out emails, and execute [Flink SQL on the Ververica Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform). + +# Goals + +Part two of the tutorial will teach you how to: + +- integrate a source connector which connects to a mailbox using the IMAP protocol +- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library that can send and receive email via the IMAP protocol +- write [Flink SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/) and execute the queries in the Ververica Platform + +You are encouraged to follow along with the code in this [repository](https://github.com/Airblader/blog-imap). It provides a boilerplate project that also comes with a bundled [docker-compose](https://docs.docker.com/compose/) setup that lets you easily run the connector. You can then try it out with Flink’s SQL client. + + +# Prerequisites + +This tutorial assumes that you have: + +- followed the steps outlined in [part one](/2021/09/06/connector-table-sql-api-part1) of this tutorial +- some familiarity with Java and objected-oriented programming + + +# Understand how to fetch emails via the IMAP protocol + +Now that you have a working source connector that can run on Flink, it is time to connect to an email server via [IMAP](https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol) (an Internet protocol that allows email clients to retrieve messages from a mail server) so that Flink can process emails instead of test static data. + +You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library that can be used to send and receive email via IMAP. For simplicity, authentication will use a plain username and password. + +This tutorial will focus more on how to implement a connector for Flink. If you want to learn more about the details of how IMAP or Jakarta Mail work, you are encouraged to explore a more extensive implementation at this [repository](https://github.com/TNG/flink-connector-email). + +In order to fetch emails, you will need to connect to the email server, register a listener for new emails and collect them whenever they arrive, and enter a loop to keep the connector running. + + +# Add configuration options - server information and credentials + +In order to connect to your IMAP server, you will need at least the following: + +- hostname (of the mail server) +- port number +- username +- password + +You will start by creating a class to encapsulate the configuration options. You will make use of [Lombok](https://projectlombok.org) to help with some boilerplate code. By adding the `@Data` and `@SuperBuilder` annotations, Lombok will generate these for all the fields of the immutable class. + +```java +import lombok.Data; +import lombok.experimental.SuperBuilder; +import javax.annotation.Nullable; +import java.io.Serializable; + +@Data +@SuperBuilder(toBuilder = true) +public class ImapSourceOptions implements Serializable { + private static final long serialVersionUID = 1L; + + private final String host; + private final @Nullable Integer port; + private final @Nullable String user; + private final @Nullable String password; +} +``` + +Now you can add an instance of this class to the `ImapSource` and `ImapTableSource` classes previously created (in part one) so it can be used there. Take note of the column names with which the table has been created. This will help later. + + +<div class="note"> + <h5>Hint</h5> + <p>The column names would be "subject" and "content" with the SQL executed in part one:</p> + <pre><code class="language-sql">CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap');</code></pre> +</div> + + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; +import java.util.List; +import java.util.stream.Collectors; + +public class ImapSource extends RichSourceFunction<RowData> { + private final ImapSourceOptions options; + private final List<String> columnNames; + + public ImapSource( + ImapSourceOptions options, + List<String> columnNames + ) { + this.options = options; + this.columnNames = columnNames.stream() + .map(String::toUpperCase) + .collect(Collectors.toList()); + } + + // ... +} +``` + +```java +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import java.util.List; + +public class ImapTableSource implements ScanTableSource { + + private final ImapSourceOptions options; + private final List<String> columnNames; + + public ImapTableSource( + ImapSourceOptions options, + List<String> columnNames + ) { + this.options = options; + this.columnNames = columnNames; + } + + // … + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) { + final ImapSource source = new ImapSource(options, columnNames); + return SourceFunctionProvider.of(source, true); + } + + @Override + public DynamicTableSource copy() { + return new ImapTableSource(options, columnNames); + } + + // … +} +``` + +Finally, in the `ImapSourceFactory` class, you need to create a `ConfigOption<>` for the hostname, port number, username, and password. Then you need to report them to Flink. Host, user, and password are mandatory and can be added to `requiredOptions()`; the port is optional and can be added to `optionalOptions()` instead. + +```java +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +import java.util.HashSet; +import java.util.Set; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + + public static final ConfigOption<String> HOST = ConfigOptions.key("host").stringType().noDefaultValue(); + public static final ConfigOption<Integer> PORT = ConfigOptions.key("port").intType().noDefaultValue(); + public static final ConfigOption<String> USER = ConfigOptions.key("user").stringType().noDefaultValue(); + public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue(); + + // … + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(HOST); + options.add(USER); + options.add(PASSWORD); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PORT); + return options; + } + // … +} +``` + +Now take a look at the `createDynamicTableSource()` function in the `ImapSourceFactory` class. Recall that previously (in part one) you had created a small helper utility [TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html), that Flink offers which ensures that required options are set and that no unknown options are provided. You can now use it to automatically make sure that the required options of hostname, port number, username, and password are all provided when creating a table using this connector. The helper function will throw an error message if one required option is missing. You can also use it to access the provided options (`getOptions()`), convert them into an instance of the `ImapTableSource` class created earlier, and provide the instance to the table source: + +```java +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.catalog.Column; + +public class ImapSourceFactory implements DynamicTableSourceFactory { + + // ... + + @Override + public DynamicTableSource createDynamicTableSource(Context ctx) { + final FactoryUtil.TableFactoryHelper factoryHelper = FactoryUtil.createTableFactoryHelper(this, ctx); + factoryHelper.validate(); + + final ImapSourceOptions options = ImapSourceOptions.builder() + .host(factoryHelper.getOptions().get(HOST)) + .port(factoryHelper.getOptions().get(PORT)) + .user(factoryHelper.getOptions().get(USER)) + .password(factoryHelper.getOptions().get(PASSWORD)) + .build(); + + final List<String> columnNames = ctx.getCatalogTable().getResolvedSchema().getColumns().stream() + .filter(Column::isPhysical) + .map(Column::getName) + .collect(Collectors.toList()); + + return new ImapTableSource(options, columnNames); + } +} +``` +<div class="note"> + <h5>Hint</h5> + <p> + Ideally, you would use connector <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/#metadata">metadata</a> instead of column names. You can refer again to the accompanying <a href="https://github.com/TNG/flink-connector-email">repository</a> which does implement this using metadata fields. + </p> +</div> + +To test these new configuration options, run: + +```sh +$ cd testing/ +$ ./build_and_run.sh +``` + +Once you see the [Flink SQL client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/) start up, execute the following statements to create a table with your connector: + +```sql +CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap'); + +SELECT * FROM T; +``` + +This time it will fail because the required options are not provided: + +``` +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: One or more required options are missing. + +Missing required options are: + +host +password +user +``` + +# Connect to the source email server + +Now that you have configured the required options to connect to the email server, it is time to actually connect to the server. + +Going back to the `ImapSource` class, you first need to convert the options given to the table source into a [Properties](https://docs.oracle.com/javase/tutorial/essential/environment/properties.html) object, which is what you can pass to the Jakarta library. You can also set various other properties here as well (i.e. enabling SSL). + +The specific properties that the Jakarta library understands are documented [here](https://jakarta.ee/specifications/mail/1.6/apidocs/index.html?com/sun/mail/imap/package-summary.html). + + +```java +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; +import java.util.Properties; + +public class ImapSource extends RichSourceFunction<RowData> { + // … + + private Properties getSessionProperties() { + Properties props = new Properties(); + props.put("mail.store.protocol", "imap"); + props.put("mail.imap.auth", true); + props.put("mail.imap.host", options.getHost()); + if (options.getPort() != null) { + props.put("mail.imap.port", options.getPort()); + } + + return props; + } +} +``` + +Now create a method (`connect()`) which sets up the connection: + +```java +import jakarta.mail.*; +import com.sun.mail.imap.IMAPFolder; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + // … + + private transient Store store; + private transient IMAPFolder folder; + + private void connect() throws Exception { + final Session session = Session.getInstance(getSessionProperties(), null); + store = session.getStore(); + store.connect(options.getUser(), options.getPassword()); + + final Folder genericFolder = store.getFolder("INBOX"); + folder = (IMAPFolder) genericFolder; + + if (!folder.isOpen()) { + folder.open(Folder.READ_ONLY); + } + } +} +``` + +You can now use this method to connect to the mail server when the source is created. Create a loop to keep the source running while collecting email counts. Lastly, implement methods to cancel and close the connection: + +```java +import jakarta.mail.*; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + private transient volatile boolean running = false; + + // … + + @Override + public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception { + connect(); + running = true; + + // TODO: Listen for new messages + + while (running) { + // Trigger some IMAP request to force the server to send a notification + folder.getMessageCount(); + Thread.sleep(250); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void close() throws Exception { + if (folder != null) { + folder.close(); + } + + if (store != null) { + store.close(); + } + } +} +``` + +There is a request trigger to the server in every loop iteration. This is crucial as it ensures that the server will keep sending notifications. A more sophisticated approach would be to make use of the IDLE protocol. + +Note that since the source is not checkpointable, no state fault tolerance will be possible. + + +## Collect incoming emails + +Now you need to listen for new emails arriving in the inbox folder and collect them. To begin, hardcode the schema and only return the email’s subject. Fortunately, Jakarta provides a simple hook (`addMessageCountListener()`) to get notified when new messages arrive on the server. You can use this in place of the “TODO” comment above: + +```java +import jakarta.mail.*; +import jakarta.mail.event.MessageCountAdapter; +import jakarta.mail.event.MessageCountEvent; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +public class ImapSource extends RichSourceFunction<RowData> { + @Override + public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception { + // … + + folder.addMessageCountListener(new MessageCountAdapter() { + @Override + public void messagesAdded(MessageCountEvent e) { + collectMessages(ctx, e.getMessages()); + } + }); + + // … + } + + private void collectMessages(SourceFunction.SourceContext<RowData> ctx, Message[] messages) { + for (Message message : messages) { + try { + ctx.collect(GenericRowData.of(StringData.fromString(message.getSubject()))); + } catch (MessagingException ignored) {} + } + } +} +``` + +Now build the project again and start up the SQL client: + +```sh +$ cd testing/ +$ ./build_and_run.sh +``` + +This time, you will connect to a [GreenMail server](https://greenmail-mail-test.github.io/greenmail/) which is started as part of the [setup](https://github.com/Airblader/blog-imap/blob/master/testing/docker-compose.yaml): + +```sql +CREATE TABLE T ( + subject STRING +) WITH ( + 'connector' = 'imap', + 'host' = 'greenmail', + 'port' = '3143', + 'user' = 'alice', + 'password' = 'alice' +); + +SELECT * FROM T; +``` + +The query above should now run continuously but no rows will be produced since it is a test server. You need to first send an email to the server. If you have [mailx](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/mailx.html) installed, you can do so by executing in your terminal: + +```sh +$ echo "This is the email body" | mailx -Sv15-compat \ + -s"Email Subject" \ + -Smta="smtp://alice:alice@localhost:3025" \ + al...@acme.org +``` + +The row “Email Subject” should now have appeared as a row in your output. Your source connector is working! + +However, since you are still hard-coding the schema produced by the source, defining the table with a different schema will produce errors. You want to be able to define which fields of an email interest you and then produce the data accordingly. To do this, you will use the list of column names from earlier and then look at it when you collect the emails. + +```java +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; + +public class ImapSource extends RichSourceFunction<RowData> { + + private void collectMessages(SourceFunction.SourceContext<RowData> ctx, Message[] messages) { + for (Message message : messages) { + try { + collectMessage(ctx, message); + } catch (MessagingException ignored) {} + } + } + + private void collectMessage(SourceFunction.SourceContext<RowData> ctx, Message message) + throws MessagingException { + final RowData row = new GenericRowData(columnNames.size()); Review comment: Sorry, this was my mistake – this _needs_ to be changed as otherwise it's an error. ```suggestion final GenericRowData row = new GenericRowData(columnNames.size()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org