Ah, my bad, thanks for pointing that out Arvid! On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:
> Hi Rex, > > you still forgot > > 'debezium-json.schema-include' = true > > Please reread my mail. > > > On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <r...@remind101.com> wrote: > >> Thanks for the input, though I've certainly included a schema as is >> reflected earlier in this thread. Including here again >> ... >> tableEnv.executeSql(""" >> CREATE TABLE topic_addresses ( >> -- schema is totally the same to the MySQL "addresses" table >> id INT, >> customer_id INT, >> street STRING, >> city STRING, >> state STRING, >> zip STRING, >> type STRING, >> PRIMARY KEY (id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'dbserver1.inventory.addresses', >> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >> 'properties.group.id' = 'testGroup', >> 'format' = 'debezium-json' -- using debezium-json as the format >> ) >> """) >> >> val table = tableEnv.from("topic_addresses").select($"*") >> ... >> >> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Rex, >>> >>> the connector expects a value without a schema, but the message contains >>> a schema. You can tell Flink that the schema is included as written in the >>> documentation [1]. >>> >>> CREATE TABLE topic_products ( >>> -- schema is totally the same to the MySQL "products" table >>> id BIGINT, >>> name STRING, >>> description STRING, >>> weight DECIMAL(10, 2)) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'products_binlog', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'testGroup', >>> 'format' = 'debezium-json', >>> 'debezium-json.schema-include' = true) >>> >>> @Jark Wu <imj...@gmail.com> , it would be probably good to make the >>> connector more robust and catch these types of misconfigurations. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format >>> >>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Awesome, so that took me a step further. When running i'm receiving an >>>> error however. FYI, my docker-compose file is based on the Debezium mysql >>>> tutorial which can be found here >>>> https://debezium.io/documentation/reference/1.2/tutorial.html >>>> >>>> Part of the stack trace: >>>> >>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt >>>> Debezium JSON message >>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 >>>> cool street","city":"Big >>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'. >>>> flink-jobmanager_1 | at >>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >>>> ~[flink-json-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | Caused by: java.lang.NullPointerException >>>> flink-jobmanager_1 | at >>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) >>>> ~[flink-json-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>> ~[?:?] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> flink-jobmanager_1 | at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>> >>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1. >>>>> >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Hi again! >>>>>> >>>>>> I'm tested out locally in docker on Flink 1.11 first to get my >>>>>> bearings before downgrading to 1.10 and figuring out how to replace the >>>>>> Debezium connector. However, I'm getting the following error >>>>>> ``` >>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait >>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. >>>>>> ``` >>>>>> >>>>>> Any suggestions for me to fix this? >>>>>> >>>>>> code: >>>>>> >>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val blinkStreamSettings = >>>>>> EnvironmentSettings >>>>>> .newInstance() >>>>>> .useBlinkPlanner() >>>>>> .inStreamingMode() >>>>>> .build() >>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv, >>>>>> blinkStreamSettings) >>>>>> >>>>>> // Table from Debezium mysql example docker: >>>>>> // >>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>> // | Field | Type | Null | Key | Default | Extra | >>>>>> // >>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment | >>>>>> // | customer_id | int(11) | NO | MUL | NULL | | >>>>>> // | street | varchar(255) | NO | | NULL | | >>>>>> // | city | varchar(255) | NO | | NULL | | >>>>>> // | state | varchar(255) | NO | | NULL | | >>>>>> // | zip | varchar(255) | NO | | NULL | | >>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | >>>>>> // >>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>> >>>>>> tableEnv.executeSql(""" >>>>>> CREATE TABLE topic_addresses ( >>>>>> -- schema is totally the same to the MySQL "addresses" table >>>>>> id INT, >>>>>> customer_id INT, >>>>>> street STRING, >>>>>> city STRING, >>>>>> state STRING, >>>>>> zip STRING, >>>>>> type STRING, >>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'dbserver1.inventory.addresses', >>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>>>> 'properties.group.id' = 'testGroup', >>>>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>>>> ) >>>>>> """) >>>>>> >>>>>> val table = tableEnv.from("topic_addresses").select($"*") >>>>>> >>>>>> // Defining a PK automatically puts it in Upsert mode, which we want. >>>>>> // TODO: type should be a keyword, is that acceptable by the DDL? >>>>>> tableEnv.executeSql(""" >>>>>> CREATE TABLE ESAddresses ( >>>>>> id INT, >>>>>> customer_id INT, >>>>>> street STRING, >>>>>> city STRING, >>>>>> state STRING, >>>>>> zip STRING, >>>>>> type STRING, >>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>> ) WITH ( >>>>>> 'connector' = 'elasticsearch-7', >>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', >>>>>> 'index' = 'flinkaddresses', >>>>>> 'format' = 'json' >>>>>> ) >>>>>> """) >>>>>> >>>>>> table.executeInsert("ESAddresses").print() >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Regarding the performance difference, the proposed way will have >>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc >>>>>>>> support. >>>>>>>> The overhead of the deduplication operator is just similar to a >>>>>>>> simple group by aggregate (max on each non-key column). >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> wrote: >>>>>>>> >>>>>>>>> Thank you so much for the help! >>>>>>>>> >>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira < >>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>> >>>>>>>>>> Yes — you'll get the full row in the payload; and you can also >>>>>>>>>> access the change operation, which might be useful in your case. >>>>>>>>>> >>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu >>>>>>>>>> <j...@apache.org> to the thread, who will be able to give you a >>>>>>>>>> more complete answer and likely also some optimization tips for your >>>>>>>>>> specific use case. >>>>>>>>>> >>>>>>>>>> Marta >>>>>>>>>> >>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Yup! This definitely helps and makes sense. >>>>>>>>>>> >>>>>>>>>>> The 'after' payload comes with all data from the row right? So >>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk and >>>>>>>>>>> null >>>>>>>>>>> values I just delete by pk, and then I can build out the rest of my >>>>>>>>>>> joins >>>>>>>>>>> like normal. >>>>>>>>>>> >>>>>>>>>>> Are there any performance implications of doing it this way that >>>>>>>>>>> is different from the out-of-the-box 1.11 solution? >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira < >>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, Rex. >>>>>>>>>>>> >>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the >>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the >>>>>>>>>>>> new >>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded >>>>>>>>>>>> streams with >>>>>>>>>>>> insert, update and delete rows. >>>>>>>>>>>> >>>>>>>>>>>> In theory, you could consume data generated with Debezium as >>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't a >>>>>>>>>>>> convenient way to really treat it as "changelog". As a workaround, >>>>>>>>>>>> what you >>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and extract >>>>>>>>>>>> the >>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] >>>>>>>>>>>> to keep >>>>>>>>>>>> only the last row. >>>>>>>>>>>> >>>>>>>>>>>> The DDL for your source table would look something like: >>>>>>>>>>>> >>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, >>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', >>>>>>>>>>>> 'format' = 'json', ... ); >>>>>>>>>>>> Hope this helps! >>>>>>>>>>>> >>>>>>>>>>>> Marta >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>>>>> [2] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html >>>>>>>>>>>> [3] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler < >>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in >>>>>>>>>>>>> 1.10? >>>>>>>>>>>>> >>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS >>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium >>>>>>>>>>>>> Connector >>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html >>>>>>>>>>>>> >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html >>>>>>>>>>>>> >>>>>>>>>>>>> I'm wondering what alternative solutions are available for >>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium >>>>>>>>>>>>> connector >>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code >>>>>>>>>>>>> out for the >>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using >>>>>>>>>>>>> Flink 1.10.0 >>>>>>>>>>>>> api? >>>>>>>>>>>>> >>>>>>>>>>>>> For context, I plan on doing some fairly complicated long >>>>>>>>>>>>> lived stateful joins / materialization using the Table API over >>>>>>>>>>>>> data >>>>>>>>>>>>> ingested from Postgres and possibly MySQL. >>>>>>>>>>>>> >>>>>>>>>>>>> Appreciate any help, thanks! >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> >>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>