Hey, Rex! This is likely due to the tombstone records that Debezium produces for DELETE operations (i.e. a record with the same key as the deleted row and a value of null). These are markers for Kafka to indicate that log compaction can remove all records for the given key, and the initial implementation of the debezium-format can't handle them. This issue is already documented (and solved) in [1].
In the meantime, can you try adding "tombstones.on.delete":false" to the configuration of your Debezium MySQL connector? Marta [1] https://issues.apache.org/jira/browse/FLINK-18705 On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <r...@remind101.com> wrote: > Hi, getting so close but ran into another issue: > > Flink successfully reads changes from Debezium/Kafka and writes them to > Elasticsearch, but there's a problem with deletions. When I DELETE a row > from MySQL the deletion makes it successfully all the way to Elasticsearch > which is great, but then the taskmanager suddenly dies with a null pointer > exception. Inserts and Updates do not have the same problem. This seems > very odd. Any help would be much appreciated. Thanks! > > flink-taskmanager_1 | 2020-08-31 23:30:33,684 WARN > org.apache.flink.runtime.taskmanager.Task [] - Source: > TableSourceScan(table=[[default_catalog, default_database, > topic_addresses]], fields=[id, customer_id, street, city, state, zip, > type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], > fields=[id, customer_id, street, city, state, zip, type]) (1/2) > (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED. > flink-taskmanager_1 | java.lang.NullPointerException: null > flink-taskmanager_1 | at java.lang.String.<init>(String.java:566) > ~[?:1.8.0_265] > flink-taskmanager_1 | at > org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) > ~[flink-json-1.11.1.jar:1.11.1] > flink-taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] > flink-taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] > flink-taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] > flink-taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] > flink-taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > flink-taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > flink-taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > flink-taskmanager_1 | 2020-08-31 23:30:33,720 INFO > org.apache.flink.runtime.taskmanager.Task [] - Freeing > task resources for Source: TableSourceScan(table=[[default_catalog, > default_database, topic_addresses]], fields=[id, customer_id, street, city, > state, zip, type]) -> Sink: > Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, > customer_id, street, city, state, zip, type]) (1/2) > (2b79917cb528f37fad7f636740d2fdd8). > flink-taskmanager_1 | 2020-08-31 23:30:33,728 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - > Un-registering task and sending final execution state FAILED to JobManager > for task Source: TableSourceScan(table=[[default_catalog, default_database, > topic_addresses]], fields=[id, customer_id, street, city, state, zip, > type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], > fields=[id, customer_id, street, city, state, zip, type]) (1/2) > 2b79917cb528f37fad7f636740d2fdd8. > flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > TableSourceScan(table=[[default_catalog, default_database, > topic_addresses]], fields=[id, customer_id, street, city, state, zip, > type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], > fields=[id, customer_id, street, city, state, zip, type]) (1/2) > (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35. > flink-jobmanager_1 | java.lang.NullPointerException: null > flink-jobmanager_1 | at java.lang.String.<init>(String.java:566) > ~[?:1.8.0_265] > flink-jobmanager_1 | at > org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) > ~[flink-json-1.11.1.jar:1.11.1] > flink-jobmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[?:?] > flink-jobmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[?:?] > flink-jobmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[?:?] > flink-jobmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[?:?] > flink-jobmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > flink-jobmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > flink-jobmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.12-1.11.1.jar:1.11.1] > > On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <r...@remind101.com> wrote: > >> Ah, my bad, thanks for pointing that out Arvid! >> >> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Rex, >>> >>> you still forgot >>> >>> 'debezium-json.schema-include' = true >>> >>> Please reread my mail. >>> >>> >>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Thanks for the input, though I've certainly included a schema as is >>>> reflected earlier in this thread. Including here again >>>> ... >>>> tableEnv.executeSql(""" >>>> CREATE TABLE topic_addresses ( >>>> -- schema is totally the same to the MySQL "addresses" table >>>> id INT, >>>> customer_id INT, >>>> street STRING, >>>> city STRING, >>>> state STRING, >>>> zip STRING, >>>> type STRING, >>>> PRIMARY KEY (id) NOT ENFORCED >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'dbserver1.inventory.addresses', >>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>> 'properties.group.id' = 'testGroup', >>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>> ) >>>> """) >>>> >>>> val table = tableEnv.from("topic_addresses").select($"*") >>>> ... >>>> >>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> Hi Rex, >>>>> >>>>> the connector expects a value without a schema, but the message >>>>> contains a schema. You can tell Flink that the schema is included as >>>>> written in the documentation [1]. >>>>> >>>>> CREATE TABLE topic_products ( >>>>> -- schema is totally the same to the MySQL "products" table >>>>> id BIGINT, >>>>> name STRING, >>>>> description STRING, >>>>> weight DECIMAL(10, 2)) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'products_binlog', >>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>> 'properties.group.id' = 'testGroup', >>>>> 'format' = 'debezium-json', >>>>> 'debezium-json.schema-include' = true) >>>>> >>>>> @Jark Wu <imj...@gmail.com> , it would be probably good to make the >>>>> connector more robust and catch these types of misconfigurations. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format >>>>> >>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Awesome, so that took me a step further. When running i'm receiving >>>>>> an error however. FYI, my docker-compose file is based on the Debezium >>>>>> mysql tutorial which can be found here >>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html >>>>>> >>>>>> Part of the stack trace: >>>>>> >>>>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt >>>>>> Debezium JSON message >>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 >>>>>> cool street","city":"Big >>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'. >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >>>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | Caused by: java.lang.NullPointerException >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) >>>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>>> ~[?:?] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> flink-jobmanager_1 | at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>> >>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote: >>>>>>> >>>>>>>> Hi again! >>>>>>>> >>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my >>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the >>>>>>>> Debezium connector. However, I'm getting the following error >>>>>>>> ``` >>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait >>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. >>>>>>>> ``` >>>>>>>> >>>>>>>> Any suggestions for me to fix this? >>>>>>>> >>>>>>>> code: >>>>>>>> >>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> val blinkStreamSettings = >>>>>>>> EnvironmentSettings >>>>>>>> .newInstance() >>>>>>>> .useBlinkPlanner() >>>>>>>> .inStreamingMode() >>>>>>>> .build() >>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv, >>>>>>>> blinkStreamSettings) >>>>>>>> >>>>>>>> // Table from Debezium mysql example docker: >>>>>>>> // >>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>> // | Field | Type | Null | Key | Default | Extra | >>>>>>>> // >>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment | >>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | | >>>>>>>> // | street | varchar(255) | NO | | NULL | | >>>>>>>> // | city | varchar(255) | NO | | NULL | | >>>>>>>> // | state | varchar(255) | NO | | NULL | | >>>>>>>> // | zip | varchar(255) | NO | | NULL | | >>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | >>>>>>>> // >>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>> >>>>>>>> tableEnv.executeSql(""" >>>>>>>> CREATE TABLE topic_addresses ( >>>>>>>> -- schema is totally the same to the MySQL "addresses" table >>>>>>>> id INT, >>>>>>>> customer_id INT, >>>>>>>> street STRING, >>>>>>>> city STRING, >>>>>>>> state STRING, >>>>>>>> zip STRING, >>>>>>>> type STRING, >>>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>>> ) WITH ( >>>>>>>> 'connector' = 'kafka', >>>>>>>> 'topic' = 'dbserver1.inventory.addresses', >>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>>>>>> 'properties.group.id' = 'testGroup', >>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>>>>>> ) >>>>>>>> """) >>>>>>>> >>>>>>>> val table = tableEnv.from("topic_addresses").select($"*") >>>>>>>> >>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we >>>>>>>> want. >>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL? >>>>>>>> tableEnv.executeSql(""" >>>>>>>> CREATE TABLE ESAddresses ( >>>>>>>> id INT, >>>>>>>> customer_id INT, >>>>>>>> street STRING, >>>>>>>> city STRING, >>>>>>>> state STRING, >>>>>>>> zip STRING, >>>>>>>> type STRING, >>>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>>> ) WITH ( >>>>>>>> 'connector' = 'elasticsearch-7', >>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', >>>>>>>> 'index' = 'flinkaddresses', >>>>>>>> 'format' = 'json' >>>>>>>> ) >>>>>>>> """) >>>>>>>> >>>>>>>> table.executeInsert("ESAddresses").print() >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Regarding the performance difference, the proposed way will have >>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc >>>>>>>>>> support. >>>>>>>>>> The overhead of the deduplication operator is just similar to a >>>>>>>>>> simple group by aggregate (max on each non-key column). >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you so much for the help! >>>>>>>>>>> >>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira < >>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also >>>>>>>>>>>> access the change operation, which might be useful in your case. >>>>>>>>>>>> >>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu >>>>>>>>>>>> <j...@apache.org> to the thread, who will be able to give you >>>>>>>>>>>> a more complete answer and likely also some optimization tips for >>>>>>>>>>>> your >>>>>>>>>>>> specific use case. >>>>>>>>>>>> >>>>>>>>>>>> Marta >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Yup! This definitely helps and makes sense. >>>>>>>>>>>>> >>>>>>>>>>>>> The 'after' payload comes with all data from the row right? So >>>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk >>>>>>>>>>>>> and null >>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of >>>>>>>>>>>>> my joins >>>>>>>>>>>>> like normal. >>>>>>>>>>>>> >>>>>>>>>>>>> Are there any performance implications of doing it this way >>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution? >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira < >>>>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, Rex. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the >>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and >>>>>>>>>>>>>> the new >>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded >>>>>>>>>>>>>> streams with >>>>>>>>>>>>>> insert, update and delete rows. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as >>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just >>>>>>>>>>>>>> wasn't a >>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a >>>>>>>>>>>>>> workaround, what you >>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and >>>>>>>>>>>>>> extract the >>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication >>>>>>>>>>>>>> [3] to keep >>>>>>>>>>>>>> only the last row. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The DDL for your source table would look something like: >>>>>>>>>>>>>> >>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, >>>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', >>>>>>>>>>>>>> 'format' = 'json', ... ); >>>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Marta >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html >>>>>>>>>>>>>> [3] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler < >>>>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support >>>>>>>>>>>>>>> in 1.10? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on >>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas >>>>>>>>>>>>>>> Debezium >>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the >>>>>>>>>>>>>>> documentation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for >>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium >>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code >>>>>>>>>>>>>>> out for the >>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using >>>>>>>>>>>>>>> Flink 1.10.0 >>>>>>>>>>>>>>> api? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long >>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over >>>>>>>>>>>>>>> data >>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Appreciate any help, thanks! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> >>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >