Hi, getting so close but ran into another issue: Flink successfully reads changes from Debezium/Kafka and writes them to Elasticsearch, but there's a problem with deletions. When I DELETE a row from MySQL the deletion makes it successfully all the way to Elasticsearch which is great, but then the taskmanager suddenly dies with a null pointer exception. Inserts and Updates do not have the same problem. This seems very odd. Any help would be much appreciated. Thanks!
flink-taskmanager_1 | 2020-08-31 23:30:33,684 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED. flink-taskmanager_1 | java.lang.NullPointerException: null flink-taskmanager_1 | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265] flink-taskmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | 2020-08-31 23:30:33,720 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8). flink-taskmanager_1 | 2020-08-31 23:30:33,728 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) 2b79917cb528f37fad7f636740d2fdd8. flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35. flink-jobmanager_1 | java.lang.NullPointerException: null flink-jobmanager_1 | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265] flink-jobmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <r...@remind101.com> wrote: > Ah, my bad, thanks for pointing that out Arvid! > > On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Rex, >> >> you still forgot >> >> 'debezium-json.schema-include' = true >> >> Please reread my mail. >> >> >> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Thanks for the input, though I've certainly included a schema as is >>> reflected earlier in this thread. Including here again >>> ... >>> tableEnv.executeSql(""" >>> CREATE TABLE topic_addresses ( >>> -- schema is totally the same to the MySQL "addresses" table >>> id INT, >>> customer_id INT, >>> street STRING, >>> city STRING, >>> state STRING, >>> zip STRING, >>> type STRING, >>> PRIMARY KEY (id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'dbserver1.inventory.addresses', >>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>> 'properties.group.id' = 'testGroup', >>> 'format' = 'debezium-json' -- using debezium-json as the format >>> ) >>> """) >>> >>> val table = tableEnv.from("topic_addresses").select($"*") >>> ... >>> >>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Rex, >>>> >>>> the connector expects a value without a schema, but the message >>>> contains a schema. You can tell Flink that the schema is included as >>>> written in the documentation [1]. >>>> >>>> CREATE TABLE topic_products ( >>>> -- schema is totally the same to the MySQL "products" table >>>> id BIGINT, >>>> name STRING, >>>> description STRING, >>>> weight DECIMAL(10, 2)) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'products_binlog', >>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>> 'properties.group.id' = 'testGroup', >>>> 'format' = 'debezium-json', >>>> 'debezium-json.schema-include' = true) >>>> >>>> @Jark Wu <imj...@gmail.com> , it would be probably good to make the >>>> connector more robust and catch these types of misconfigurations. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format >>>> >>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Awesome, so that took me a step further. When running i'm receiving an >>>>> error however. FYI, my docker-compose file is based on the Debezium mysql >>>>> tutorial which can be found here >>>>> https://debezium.io/documentation/reference/1.2/tutorial.html >>>>> >>>>> Part of the stack trace: >>>>> >>>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt >>>>> Debezium JSON message >>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 >>>>> cool street","city":"Big >>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'. >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | Caused by: java.lang.NullPointerException >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) >>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>> ~[?:?] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> flink-jobmanager_1 | at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>> >>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1. >>>>>> >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote: >>>>>> >>>>>>> Hi again! >>>>>>> >>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my >>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the >>>>>>> Debezium connector. However, I'm getting the following error >>>>>>> ``` >>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait >>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. >>>>>>> ``` >>>>>>> >>>>>>> Any suggestions for me to fix this? >>>>>>> >>>>>>> code: >>>>>>> >>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> val blinkStreamSettings = >>>>>>> EnvironmentSettings >>>>>>> .newInstance() >>>>>>> .useBlinkPlanner() >>>>>>> .inStreamingMode() >>>>>>> .build() >>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv, >>>>>>> blinkStreamSettings) >>>>>>> >>>>>>> // Table from Debezium mysql example docker: >>>>>>> // >>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>> // | Field | Type | Null | Key | Default | Extra | >>>>>>> // >>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment | >>>>>>> // | customer_id | int(11) | NO | MUL | NULL | | >>>>>>> // | street | varchar(255) | NO | | NULL | | >>>>>>> // | city | varchar(255) | NO | | NULL | | >>>>>>> // | state | varchar(255) | NO | | NULL | | >>>>>>> // | zip | varchar(255) | NO | | NULL | | >>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | >>>>>>> // >>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>> >>>>>>> tableEnv.executeSql(""" >>>>>>> CREATE TABLE topic_addresses ( >>>>>>> -- schema is totally the same to the MySQL "addresses" table >>>>>>> id INT, >>>>>>> customer_id INT, >>>>>>> street STRING, >>>>>>> city STRING, >>>>>>> state STRING, >>>>>>> zip STRING, >>>>>>> type STRING, >>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>> ) WITH ( >>>>>>> 'connector' = 'kafka', >>>>>>> 'topic' = 'dbserver1.inventory.addresses', >>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>>>>> 'properties.group.id' = 'testGroup', >>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>>>>> ) >>>>>>> """) >>>>>>> >>>>>>> val table = tableEnv.from("topic_addresses").select($"*") >>>>>>> >>>>>>> // Defining a PK automatically puts it in Upsert mode, which we want. >>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL? >>>>>>> tableEnv.executeSql(""" >>>>>>> CREATE TABLE ESAddresses ( >>>>>>> id INT, >>>>>>> customer_id INT, >>>>>>> street STRING, >>>>>>> city STRING, >>>>>>> state STRING, >>>>>>> zip STRING, >>>>>>> type STRING, >>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>> ) WITH ( >>>>>>> 'connector' = 'elasticsearch-7', >>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', >>>>>>> 'index' = 'flinkaddresses', >>>>>>> 'format' = 'json' >>>>>>> ) >>>>>>> """) >>>>>>> >>>>>>> table.executeInsert("ESAddresses").print() >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Regarding the performance difference, the proposed way will have >>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc >>>>>>>>> support. >>>>>>>>> The overhead of the deduplication operator is just similar to a >>>>>>>>> simple group by aggregate (max on each non-key column). >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thank you so much for the help! >>>>>>>>>> >>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira < >>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>> >>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also >>>>>>>>>>> access the change operation, which might be useful in your case. >>>>>>>>>>> >>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu >>>>>>>>>>> <j...@apache.org> to the thread, who will be able to give you a >>>>>>>>>>> more complete answer and likely also some optimization tips for your >>>>>>>>>>> specific use case. >>>>>>>>>>> >>>>>>>>>>> Marta >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Yup! This definitely helps and makes sense. >>>>>>>>>>>> >>>>>>>>>>>> The 'after' payload comes with all data from the row right? So >>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk >>>>>>>>>>>> and null >>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of >>>>>>>>>>>> my joins >>>>>>>>>>>> like normal. >>>>>>>>>>>> >>>>>>>>>>>> Are there any performance implications of doing it this way >>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution? >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira < >>>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, Rex. >>>>>>>>>>>>> >>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the >>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the >>>>>>>>>>>>> new >>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded >>>>>>>>>>>>> streams with >>>>>>>>>>>>> insert, update and delete rows. >>>>>>>>>>>>> >>>>>>>>>>>>> In theory, you could consume data generated with Debezium as >>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't >>>>>>>>>>>>> a >>>>>>>>>>>>> convenient way to really treat it as "changelog". As a >>>>>>>>>>>>> workaround, what you >>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and >>>>>>>>>>>>> extract the >>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] >>>>>>>>>>>>> to keep >>>>>>>>>>>>> only the last row. >>>>>>>>>>>>> >>>>>>>>>>>>> The DDL for your source table would look something like: >>>>>>>>>>>>> >>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, >>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', >>>>>>>>>>>>> 'format' = 'json', ... ); >>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>> >>>>>>>>>>>>> Marta >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>>>>>> [2] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html >>>>>>>>>>>>> [3] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler < >>>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support >>>>>>>>>>>>>> in 1.10? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS >>>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium >>>>>>>>>>>>>> Connector >>>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm wondering what alternative solutions are available for >>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium >>>>>>>>>>>>>> connector >>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code >>>>>>>>>>>>>> out for the >>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using >>>>>>>>>>>>>> Flink 1.10.0 >>>>>>>>>>>>>> api? >>>>>>>>>>>>>> >>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long >>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over >>>>>>>>>>>>>> data >>>>>>>>>>>>>> ingested from Postgres and possibly MySQL. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Appreciate any help, thanks! >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> >>>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>