This worked, thanks! Looking forward to the future releases :) On Mon, Aug 31, 2020 at 5:06 PM Marta Paes Moreira <ma...@ververica.com> wrote:
> Hey, Rex! > > This is likely due to the tombstone records that Debezium produces for > DELETE operations (i.e. a record with the same key as the deleted row and a > value of null). These are markers for Kafka to indicate that log > compaction can remove all records for the given key, and the initial > implementation of the debezium-format can't handle them. This issue is > already documented (and solved) in [1]. > > In the meantime, can you try adding "tombstones.on.delete":false" to the > configuration of your Debezium MySQL connector? Marta > [1] https://issues.apache.org/jira/browse/FLINK-18705 > > On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <r...@remind101.com> wrote: > >> Hi, getting so close but ran into another issue: >> >> Flink successfully reads changes from Debezium/Kafka and writes them to >> Elasticsearch, but there's a problem with deletions. When I DELETE a row >> from MySQL the deletion makes it successfully all the way to Elasticsearch >> which is great, but then the taskmanager suddenly dies with a null pointer >> exception. Inserts and Updates do not have the same problem. This seems >> very odd. Any help would be much appreciated. Thanks! >> >> flink-taskmanager_1 | 2020-08-31 23:30:33,684 WARN >> org.apache.flink.runtime.taskmanager.Task [] - Source: >> TableSourceScan(table=[[default_catalog, default_database, >> topic_addresses]], fields=[id, customer_id, street, city, state, zip, >> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], >> fields=[id, customer_id, street, city, state, zip, type]) (1/2) >> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED. >> flink-taskmanager_1 | java.lang.NullPointerException: null >> flink-taskmanager_1 | at java.lang.String.<init>(String.java:566) >> ~[?:1.8.0_265] >> flink-taskmanager_1 | at >> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >> ~[flink-json-1.11.1.jar:1.11.1] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> flink-taskmanager_1 | at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> flink-taskmanager_1 | 2020-08-31 23:30:33,720 INFO >> org.apache.flink.runtime.taskmanager.Task [] - Freeing >> task resources for Source: TableSourceScan(table=[[default_catalog, >> default_database, topic_addresses]], fields=[id, customer_id, street, city, >> state, zip, type]) -> Sink: >> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, >> customer_id, street, city, state, zip, type]) (1/2) >> (2b79917cb528f37fad7f636740d2fdd8). >> flink-taskmanager_1 | 2020-08-31 23:30:33,728 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - >> Un-registering task and sending final execution state FAILED to JobManager >> for task Source: TableSourceScan(table=[[default_catalog, default_database, >> topic_addresses]], fields=[id, customer_id, street, city, state, zip, >> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], >> fields=[id, customer_id, street, city, state, zip, type]) (1/2) >> 2b79917cb528f37fad7f636740d2fdd8. >> flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> TableSourceScan(table=[[default_catalog, default_database, >> topic_addresses]], fields=[id, customer_id, street, city, state, zip, >> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], >> fields=[id, customer_id, street, city, state, zip, type]) (1/2) >> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on >> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35. >> flink-jobmanager_1 | java.lang.NullPointerException: null >> flink-jobmanager_1 | at java.lang.String.<init>(String.java:566) >> ~[?:1.8.0_265] >> flink-jobmanager_1 | at >> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >> ~[flink-json-1.11.1.jar:1.11.1] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >> ~[?:?] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >> ~[?:?] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >> ~[?:?] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >> ~[?:?] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> flink-jobmanager_1 | at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >> >> On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Ah, my bad, thanks for pointing that out Arvid! >>> >>> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> >>> wrote: >>> >>>> Hi Rex, >>>> >>>> you still forgot >>>> >>>> 'debezium-json.schema-include' = true >>>> >>>> Please reread my mail. >>>> >>>> >>>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Thanks for the input, though I've certainly included a schema as is >>>>> reflected earlier in this thread. Including here again >>>>> ... >>>>> tableEnv.executeSql(""" >>>>> CREATE TABLE topic_addresses ( >>>>> -- schema is totally the same to the MySQL "addresses" table >>>>> id INT, >>>>> customer_id INT, >>>>> street STRING, >>>>> city STRING, >>>>> state STRING, >>>>> zip STRING, >>>>> type STRING, >>>>> PRIMARY KEY (id) NOT ENFORCED >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'dbserver1.inventory.addresses', >>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>>> 'properties.group.id' = 'testGroup', >>>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>>> ) >>>>> """) >>>>> >>>>> val table = tableEnv.from("topic_addresses").select($"*") >>>>> ... >>>>> >>>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Rex, >>>>>> >>>>>> the connector expects a value without a schema, but the message >>>>>> contains a schema. You can tell Flink that the schema is included as >>>>>> written in the documentation [1]. >>>>>> >>>>>> CREATE TABLE topic_products ( >>>>>> -- schema is totally the same to the MySQL "products" table >>>>>> id BIGINT, >>>>>> name STRING, >>>>>> description STRING, >>>>>> weight DECIMAL(10, 2)) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'products_binlog', >>>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>>> 'properties.group.id' = 'testGroup', >>>>>> 'format' = 'debezium-json', >>>>>> 'debezium-json.schema-include' = true) >>>>>> >>>>>> @Jark Wu <imj...@gmail.com> , it would be probably good to make the >>>>>> connector more robust and catch these types of misconfigurations. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format >>>>>> >>>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <r...@remind101.com> >>>>>> wrote: >>>>>> >>>>>>> Awesome, so that took me a step further. When running i'm receiving >>>>>>> an error however. FYI, my docker-compose file is based on the Debezium >>>>>>> mysql tutorial which can be found here >>>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html >>>>>>> >>>>>>> Part of the stack trace: >>>>>>> >>>>>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt >>>>>>> Debezium JSON message >>>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 >>>>>>> cool street","city":"Big >>>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'. >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) >>>>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | Caused by: java.lang.NullPointerException >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) >>>>>>> ~[flink-json-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>>>> ~[?:?] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> flink-jobmanager_1 | at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >>>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>>>>>> >>>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <imj...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1. >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote: >>>>>>>> >>>>>>>>> Hi again! >>>>>>>>> >>>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my >>>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace >>>>>>>>> the >>>>>>>>> Debezium connector. However, I'm getting the following error >>>>>>>>> ``` >>>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait >>>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> Any suggestions for me to fix this? >>>>>>>>> >>>>>>>>> code: >>>>>>>>> >>>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>> val blinkStreamSettings = >>>>>>>>> EnvironmentSettings >>>>>>>>> .newInstance() >>>>>>>>> .useBlinkPlanner() >>>>>>>>> .inStreamingMode() >>>>>>>>> .build() >>>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv, >>>>>>>>> blinkStreamSettings) >>>>>>>>> >>>>>>>>> // Table from Debezium mysql example docker: >>>>>>>>> // >>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>>> // | Field | Type | Null | Key | Default | Extra | >>>>>>>>> // >>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment | >>>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | | >>>>>>>>> // | street | varchar(255) | NO | | NULL | | >>>>>>>>> // | city | varchar(255) | NO | | NULL | | >>>>>>>>> // | state | varchar(255) | NO | | NULL | | >>>>>>>>> // | zip | varchar(255) | NO | | NULL | | >>>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | >>>>>>>>> // >>>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+ >>>>>>>>> >>>>>>>>> tableEnv.executeSql(""" >>>>>>>>> CREATE TABLE topic_addresses ( >>>>>>>>> -- schema is totally the same to the MySQL "addresses" table >>>>>>>>> id INT, >>>>>>>>> customer_id INT, >>>>>>>>> street STRING, >>>>>>>>> city STRING, >>>>>>>>> state STRING, >>>>>>>>> zip STRING, >>>>>>>>> type STRING, >>>>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>>>> ) WITH ( >>>>>>>>> 'connector' = 'kafka', >>>>>>>>> 'topic' = 'dbserver1.inventory.addresses', >>>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', >>>>>>>>> 'properties.group.id' = 'testGroup', >>>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format >>>>>>>>> ) >>>>>>>>> """) >>>>>>>>> >>>>>>>>> val table = tableEnv.from("topic_addresses").select($"*") >>>>>>>>> >>>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we >>>>>>>>> want. >>>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL? >>>>>>>>> tableEnv.executeSql(""" >>>>>>>>> CREATE TABLE ESAddresses ( >>>>>>>>> id INT, >>>>>>>>> customer_id INT, >>>>>>>>> street STRING, >>>>>>>>> city STRING, >>>>>>>>> state STRING, >>>>>>>>> zip STRING, >>>>>>>>> type STRING, >>>>>>>>> PRIMARY KEY (id) NOT ENFORCED >>>>>>>>> ) WITH ( >>>>>>>>> 'connector' = 'elasticsearch-7', >>>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', >>>>>>>>> 'index' = 'flinkaddresses', >>>>>>>>> 'format' = 'json' >>>>>>>>> ) >>>>>>>>> """) >>>>>>>>> >>>>>>>>> table.executeInsert("ESAddresses").print() >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Regarding the performance difference, the proposed way will have >>>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc >>>>>>>>>>> support. >>>>>>>>>>> The overhead of the deduplication operator is just similar to a >>>>>>>>>>> simple group by aggregate (max on each non-key column). >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jark >>>>>>>>>>> >>>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thank you so much for the help! >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira < >>>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also >>>>>>>>>>>>> access the change operation, which might be useful in your case. >>>>>>>>>>>>> >>>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu >>>>>>>>>>>>> <j...@apache.org> to the thread, who will be able to give you >>>>>>>>>>>>> a more complete answer and likely also some optimization tips for >>>>>>>>>>>>> your >>>>>>>>>>>>> specific use case. >>>>>>>>>>>>> >>>>>>>>>>>>> Marta >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Yup! This definitely helps and makes sense. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The 'after' payload comes with all data from the row right? >>>>>>>>>>>>>> So essentially inserts and updates I can insert/replace data by >>>>>>>>>>>>>> pk and null >>>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of >>>>>>>>>>>>>> my joins >>>>>>>>>>>>>> like normal. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Are there any performance implications of doing it this way >>>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira < >>>>>>>>>>>>>> ma...@ververica.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, Rex. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the >>>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and >>>>>>>>>>>>>>> the new >>>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded >>>>>>>>>>>>>>> streams with >>>>>>>>>>>>>>> insert, update and delete rows. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as >>>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just >>>>>>>>>>>>>>> wasn't a >>>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a >>>>>>>>>>>>>>> workaround, what you >>>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and >>>>>>>>>>>>>>> extract the >>>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication >>>>>>>>>>>>>>> [3] to keep >>>>>>>>>>>>>>> only the last row. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The DDL for your source table would look something like: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` >>>>>>>>>>>>>>> DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = >>>>>>>>>>>>>>> 'kafka', 'format' = 'json', ... ); >>>>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Marta >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html >>>>>>>>>>>>>>> [3] >>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler < >>>>>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support >>>>>>>>>>>>>>>> in 1.10? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on >>>>>>>>>>>>>>>> AWS EMR, however, EMR only supports Flink 1.10.0, whereas >>>>>>>>>>>>>>>> Debezium >>>>>>>>>>>>>>>> Connector arrived in Flink 1.11.0, from looking at the >>>>>>>>>>>>>>>> documentation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for >>>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium >>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the >>>>>>>>>>>>>>>> code out for the >>>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using >>>>>>>>>>>>>>>> Flink 1.10.0 >>>>>>>>>>>>>>>> api? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long >>>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API >>>>>>>>>>>>>>>> over data >>>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Appreciate any help, thanks! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> >>>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Arvid Heise | Senior Java Developer >>>>>> >>>>>> <https://www.ververica.com/> >>>>>> >>>>>> Follow us @VervericaData >>>>>> >>>>>> -- >>>>>> >>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>> Conference >>>>>> >>>>>> Stream Processing | Event Driven | Real Time >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> >>>>>> -- >>>>>> Ververica GmbH >>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>> Ji (Toni) Cheng >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>