Hi, This is a known issue in 1.11.0, and has been fixed in 1.11.1.
Best, Jark On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote: > Hi again! > > I'm tested out locally in docker on Flink 1.11 first to get my bearings > before downgrading to 1.10 and figuring out how to replace the Debezium > connector. However, I'm getting the following error > ``` > Provided trait [BEFORE_AND_AFTER] can't satisfy required trait > [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. > ``` > > Any suggestions for me to fix this? > > code: > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > val blinkStreamSettings = > EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings) > > // Table from Debezium mysql example docker: > // > +-------------+-------------------------------------+------+-----+---------+----------------+ > // | Field | Type | Null | Key | Default | Extra | > // > +-------------+-------------------------------------+------+-----+---------+----------------+ > // | id | int(11) | NO | PRI | NULL | auto_increment | > // | customer_id | int(11) | NO | MUL | NULL | | > // | street | varchar(255) | NO | | NULL | | > // | city | varchar(255) | NO | | NULL | | > // | state | varchar(255) | NO | | NULL | | > // | zip | varchar(255) | NO | | NULL | | > // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | > // > +-------------+-------------------------------------+------+-----+---------+----------------+ > > tableEnv.executeSql(""" > CREATE TABLE topic_addresses ( > -- schema is totally the same to the MySQL "addresses" table > id INT, > customer_id INT, > street STRING, > city STRING, > state STRING, > zip STRING, > type STRING, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'dbserver1.inventory.addresses', > 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'debezium-json' -- using debezium-json as the format > ) > """) > > val table = tableEnv.from("topic_addresses").select($"*") > > // Defining a PK automatically puts it in Upsert mode, which we want. > // TODO: type should be a keyword, is that acceptable by the DDL? > tableEnv.executeSql(""" > CREATE TABLE ESAddresses ( > id INT, > customer_id INT, > street STRING, > city STRING, > state STRING, > zip STRING, > type STRING, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', > 'index' = 'flinkaddresses', > 'format' = 'json' > ) > """) > > table.executeInsert("ESAddresses").print() > > Thanks! > > On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> wrote: > >> Thanks! >> >> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi, >>> >>> Regarding the performance difference, the proposed way will have one >>> more stateful operator (deduplication) than the native 1.11 cdc support. >>> The overhead of the deduplication operator is just similar to a simple >>> group by aggregate (max on each non-key column). >>> >>> Best, >>> Jark >>> >>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> wrote: >>> >>>> Thank you so much for the help! >>>> >>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com> >>>> wrote: >>>> >>>>> Yes — you'll get the full row in the payload; and you can also access >>>>> the change operation, which might be useful in your case. >>>>> >>>>> About performance, I'm summoning Kurt and @Jark Wu <j...@apache.org> to >>>>> the thread, who will be able to give you a more complete answer and likely >>>>> also some optimization tips for your specific use case. >>>>> >>>>> Marta >>>>> >>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Yup! This definitely helps and makes sense. >>>>>> >>>>>> The 'after' payload comes with all data from the row right? So >>>>>> essentially inserts and updates I can insert/replace data by pk and null >>>>>> values I just delete by pk, and then I can build out the rest of my joins >>>>>> like normal. >>>>>> >>>>>> Are there any performance implications of doing it this way that is >>>>>> different from the out-of-the-box 1.11 solution? >>>>>> >>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira < >>>>>> ma...@ververica.com> wrote: >>>>>> >>>>>>> Hi, Rex. >>>>>>> >>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring >>>>>>> of the table source interfaces (FLIP-95 [1]), and the new >>>>>>> ScanTableSource >>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update >>>>>>> and >>>>>>> delete rows. >>>>>>> >>>>>>> In theory, you could consume data generated with Debezium as regular >>>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient >>>>>>> way >>>>>>> to really treat it as "changelog". As a workaround, what you can do in >>>>>>> Flink 1.10 is process these messages as JSON and extract the "after" >>>>>>> field >>>>>>> from the payload, and then apply de-duplication [3] to keep only the >>>>>>> last >>>>>>> row. >>>>>>> >>>>>>> The DDL for your source table would look something like: >>>>>>> >>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, >>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = >>>>>>> 'json', ... ); >>>>>>> Hope this helps! >>>>>>> >>>>>>> Marta >>>>>>> >>>>>>> [1] >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html >>>>>>> [3] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler < >>>>>>> ches...@apache.org> wrote: >>>>>>> >>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10? >>>>>>>> >>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, >>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector >>>>>>>> arrived >>>>>>>> in Flink 1.11.0, from looking at the documentation. >>>>>>>> >>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html >>>>>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html >>>>>>>> >>>>>>>> I'm wondering what alternative solutions are available for >>>>>>>> connecting Debezium to Flink? Is there an open source Debezium >>>>>>>> connector >>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out >>>>>>>> for the >>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink >>>>>>>> 1.10.0 >>>>>>>> api? >>>>>>>> >>>>>>>> For context, I plan on doing some fairly complicated long lived >>>>>>>> stateful joins / materialization using the Table API over data ingested >>>>>>>> from Postgres and possibly MySQL. >>>>>>>> >>>>>>>> Appreciate any help, thanks! >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >