Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to
Elasticsearch, but there's a problem with deletions. When I DELETE a row
from MySQL the deletion makes it successfully all the way to Elasticsearch
which is great, but then the taskmanager suddenly dies with a null pointer
exception. Inserts and Updates do not have the same problem. This seems
very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN
 org.apache.flink.runtime.taskmanager.Task                    [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1    | java.lang.NullPointerException: null
flink-taskmanager_1    | at java.lang.String.<init>(String.java:566)
~[?:1.8.0_265]
flink-taskmanager_1    | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO
 org.apache.flink.runtime.taskmanager.Task                    [] - Freeing
task resources for Source: TableSourceScan(table=[[default_catalog,
default_database, topic_addresses]], fields=[id, customer_id, street, city,
state, zip, type]) -> Sink:
Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
Un-registering task and sending final execution state FAILED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1     | java.lang.NullPointerException: null
flink-jobmanager_1     | at java.lang.String.<init>(String.java:566)
~[?:1.8.0_265]
flink-jobmanager_1     | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[?:?]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <r...@remind101.com> wrote:

> Ah, my bad, thanks for pointing that out Arvid!
>
> On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Rex,
>>
>> you still forgot
>>
>> 'debezium-json.schema-include' = true
>>
>> Please reread my mail.
>>
>>
>> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> Thanks for the input, though I've certainly included a schema as is
>>> reflected earlier in this thread. Including here again
>>> ...
>>> tableEnv.executeSql("""
>>> CREATE TABLE topic_addresses (
>>> -- schema is totally the same to the MySQL "addresses" table
>>> id INT,
>>> customer_id INT,
>>> street STRING,
>>> city STRING,
>>> state STRING,
>>> zip STRING,
>>> type STRING,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'dbserver1.inventory.addresses',
>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>> 'properties.group.id' = 'testGroup',
>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>> )
>>> """)
>>>
>>> val table = tableEnv.from("topic_addresses").select($"*")
>>> ...
>>>
>>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> the connector expects a value without a schema, but the message
>>>> contains a schema. You can tell Flink that the schema is included as
>>>> written in the documentation [1].
>>>>
>>>> CREATE TABLE topic_products (
>>>>   -- schema is totally the same to the MySQL "products" table
>>>>   id BIGINT,
>>>>   name STRING,
>>>>   description STRING,
>>>>   weight DECIMAL(10, 2)) WITH (
>>>>  'connector' = 'kafka',
>>>>  'topic' = 'products_binlog',
>>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>>  'properties.group.id' = 'testGroup',
>>>>  'format' = 'debezium-json',
>>>>  'debezium-json.schema-include' = true)
>>>>
>>>> @Jark Wu <imj...@gmail.com> , it would be probably good to make the
>>>> connector more robust and catch these types of misconfigurations.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>>
>>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Awesome, so that took me a step further. When running i'm receiving an
>>>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>>>> tutorial which can be found here
>>>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>>>
>>>>> Part of the stack trace:
>>>>>
>>>>> flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt
>>>>> Debezium JSON message
>>>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>>>> cool street","city":"Big
>>>>> City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | Caused by: java.lang.NullPointerException
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115)
>>>>> ~[flink-json-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> ~[?:?]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>> flink-jobmanager_1     | at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>>
>>>>> On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This is a known issue in 1.11.0, and has been fixed in 1.11.1.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote:
>>>>>>
>>>>>>> Hi again!
>>>>>>>
>>>>>>> I'm tested out locally in docker on Flink 1.11 first to get my
>>>>>>> bearings before downgrading to 1.10 and figuring out how to replace the
>>>>>>> Debezium connector. However, I'm getting the following error
>>>>>>> ```
>>>>>>> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
>>>>>>> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
>>>>>>> ```
>>>>>>>
>>>>>>> Any suggestions for me to fix this?
>>>>>>>
>>>>>>> code:
>>>>>>>
>>>>>>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> val blinkStreamSettings =
>>>>>>> EnvironmentSettings
>>>>>>> .newInstance()
>>>>>>> .useBlinkPlanner()
>>>>>>> .inStreamingMode()
>>>>>>> .build()
>>>>>>> val tableEnv = StreamTableEnvironment.create(bsEnv,
>>>>>>> blinkStreamSettings)
>>>>>>>
>>>>>>> // Table from Debezium mysql example docker:
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>> // | Field | Type | Null | Key | Default | Extra |
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>> // | id | int(11) | NO | PRI | NULL | auto_increment |
>>>>>>> // | customer_id | int(11) | NO | MUL | NULL | |
>>>>>>> // | street | varchar(255) | NO | | NULL | |
>>>>>>> // | city | varchar(255) | NO | | NULL | |
>>>>>>> // | state | varchar(255) | NO | | NULL | |
>>>>>>> // | zip | varchar(255) | NO | | NULL | |
>>>>>>> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
>>>>>>> //
>>>>>>> +-------------+-------------------------------------+------+-----+---------+----------------+
>>>>>>>
>>>>>>> tableEnv.executeSql("""
>>>>>>> CREATE TABLE topic_addresses (
>>>>>>> -- schema is totally the same to the MySQL "addresses" table
>>>>>>> id INT,
>>>>>>> customer_id INT,
>>>>>>> street STRING,
>>>>>>> city STRING,
>>>>>>> state STRING,
>>>>>>> zip STRING,
>>>>>>> type STRING,
>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'kafka',
>>>>>>> 'topic' = 'dbserver1.inventory.addresses',
>>>>>>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>>>>>>> 'properties.group.id' = 'testGroup',
>>>>>>> 'format' = 'debezium-json' -- using debezium-json as the format
>>>>>>> )
>>>>>>> """)
>>>>>>>
>>>>>>> val table = tableEnv.from("topic_addresses").select($"*")
>>>>>>>
>>>>>>> // Defining a PK automatically puts it in Upsert mode, which we want.
>>>>>>> // TODO: type should be a keyword, is that acceptable by the DDL?
>>>>>>> tableEnv.executeSql("""
>>>>>>> CREATE TABLE ESAddresses (
>>>>>>> id INT,
>>>>>>> customer_id INT,
>>>>>>> street STRING,
>>>>>>> city STRING,
>>>>>>> state STRING,
>>>>>>> zip STRING,
>>>>>>> type STRING,
>>>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'elasticsearch-7',
>>>>>>> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
>>>>>>> 'index' = 'flinkaddresses',
>>>>>>> 'format' = 'json'
>>>>>>> )
>>>>>>> """)
>>>>>>>
>>>>>>> table.executeInsert("ESAddresses").print()
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Regarding the performance difference, the proposed way will have
>>>>>>>>> one more stateful operator (deduplication) than the native 1.11 cdc
>>>>>>>>> support.
>>>>>>>>> The overhead of the deduplication operator is just similar to a
>>>>>>>>> simple group by aggregate (max on each non-key column).
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you so much for the help!
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <
>>>>>>>>>> ma...@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes — you'll get the full row in the payload; and you can also
>>>>>>>>>>> access the change operation, which might be useful in your case.
>>>>>>>>>>>
>>>>>>>>>>> About performance, I'm summoning Kurt and @Jark Wu
>>>>>>>>>>> <j...@apache.org> to the thread, who will be able to give you a
>>>>>>>>>>> more complete answer and likely also some optimization tips for your
>>>>>>>>>>> specific use case.
>>>>>>>>>>>
>>>>>>>>>>> Marta
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yup! This definitely helps and makes sense.
>>>>>>>>>>>>
>>>>>>>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>>>>>>>> essentially inserts and updates I can insert/replace data by pk 
>>>>>>>>>>>> and null
>>>>>>>>>>>> values I just delete by pk, and then I can build out the rest of 
>>>>>>>>>>>> my joins
>>>>>>>>>>>> like normal.
>>>>>>>>>>>>
>>>>>>>>>>>> Are there any performance implications of doing it this way
>>>>>>>>>>>> that is different from the out-of-the-box 1.11 solution?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>>>>>>>> ma...@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, Rex.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Part of what enabled CDC support in Flink 1.11 was the
>>>>>>>>>>>>> refactoring of the table source interfaces (FLIP-95 [1]), and the 
>>>>>>>>>>>>> new
>>>>>>>>>>>>> ScanTableSource [2], which allows to emit bounded/unbounded 
>>>>>>>>>>>>> streams with
>>>>>>>>>>>>> insert, update and delete rows.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In theory, you could consume data generated with Debezium as
>>>>>>>>>>>>> regular JSON-encoded events before Flink 1.11 — there just wasn't 
>>>>>>>>>>>>> a
>>>>>>>>>>>>> convenient way to really treat it as "changelog". As a 
>>>>>>>>>>>>> workaround, what you
>>>>>>>>>>>>> can do in Flink 1.10 is process these messages as JSON and 
>>>>>>>>>>>>> extract the
>>>>>>>>>>>>> "after" field from the payload, and then apply de-duplication [3] 
>>>>>>>>>>>>> to keep
>>>>>>>>>>>>> only the last row.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The DDL for your source table would look something like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka',
>>>>>>>>>>>>> 'format' = 'json', ... );
>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Marta
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>>>>>>>> [3]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>>>>>>>> ches...@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support
>>>>>>>>>>>>>> in 1.10?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS
>>>>>>>>>>>>>> EMR, however, EMR only supports Flink 1.10.0, whereas Debezium 
>>>>>>>>>>>>>> Connector
>>>>>>>>>>>>>> arrived in Flink 1.11.0, from looking at the documentation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium 
>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code 
>>>>>>>>>>>>>> out for the
>>>>>>>>>>>>>> 1.11.0 Debezium connector and compile it in my project using 
>>>>>>>>>>>>>> Flink 1.10.0
>>>>>>>>>>>>>> api?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For context, I plan on doing some fairly complicated long
>>>>>>>>>>>>>> lived stateful joins / materialization using the Table API over 
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>> ingested from Postgres and possibly MySQL.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Appreciate any help, thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to