Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1.


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <r...@remind101.com> wrote:

> Hi again!
>
> I'm tested out locally in docker on Flink 1.11 first to get my bearings
> before downgrading to 1.10 and figuring out how to replace the Debezium
> connector. However, I'm getting the following error
> ```
> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
> ```
>
> Any suggestions for me to fix this?
>
> code:
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val blinkStreamSettings =
> EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)
>
> // Table from Debezium mysql example docker:
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
> // | Field | Type | Null | Key | Default | Extra |
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
> // | id | int(11) | NO | PRI | NULL | auto_increment |
> // | customer_id | int(11) | NO | MUL | NULL | |
> // | street | varchar(255) | NO | | NULL | |
> // | city | varchar(255) | NO | | NULL | |
> // | state | varchar(255) | NO | | NULL | |
> // | zip | varchar(255) | NO | | NULL | |
> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
> //
> +-------------+-------------------------------------+------+-----+---------+----------------+
>
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
>
> // Defining a PK automatically puts it in Upsert mode, which we want.
> // TODO: type should be a keyword, is that acceptable by the DDL?
> tableEnv.executeSql("""
> CREATE TABLE ESAddresses (
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
> 'index' = 'flinkaddresses',
> 'format' = 'json'
> )
> """)
>
> table.executeInsert("ESAddresses").print()
>
> Thanks!
>
> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> wrote:
>
>> Thanks!
>>
>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Regarding the performance difference, the proposed way will have one
>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>> The overhead of the deduplication operator is just similar to a simple
>>> group by aggregate (max on each non-key column).
>>>
>>> Best,
>>> Jark
>>>
>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> Thank you so much for the help!
>>>>
>>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> Yes — you'll get the full row in the payload; and you can also access
>>>>> the change operation, which might be useful in your case.
>>>>>
>>>>> About performance, I'm summoning Kurt and @Jark Wu <j...@apache.org> to
>>>>> the thread, who will be able to give you a more complete answer and likely
>>>>> also some optimization tips for your specific use case.
>>>>>
>>>>> Marta
>>>>>
>>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> Yup! This definitely helps and makes sense.
>>>>>>
>>>>>> The 'after' payload comes with all data from the row right? So
>>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>>> like normal.
>>>>>>
>>>>>> Are there any performance implications of doing it this way that is
>>>>>> different from the out-of-the-box 1.11 solution?
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>>> ma...@ververica.com> wrote:
>>>>>>
>>>>>>> Hi, Rex.
>>>>>>>
>>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring
>>>>>>> of the table source interfaces (FLIP-95 [1]), and the new 
>>>>>>> ScanTableSource
>>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update 
>>>>>>> and
>>>>>>> delete rows.
>>>>>>>
>>>>>>> In theory, you could consume data generated with Debezium as regular
>>>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient 
>>>>>>> way
>>>>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>>>>> Flink 1.10 is process these messages as JSON and extract the "after" 
>>>>>>> field
>>>>>>> from the payload, and then apply de-duplication [3] to keep only the 
>>>>>>> last
>>>>>>> row.
>>>>>>>
>>>>>>> The DDL for your source table would look something like:
>>>>>>>
>>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>>> 'json', ... );
>>>>>>> Hope this helps!
>>>>>>>
>>>>>>> Marta
>>>>>>>
>>>>>>> [1]
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>>> [3]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <
>>>>>>> ches...@apache.org> wrote:
>>>>>>>
>>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>>>>
>>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
>>>>>>>> arrived
>>>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>>>
>>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>>
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>>
>>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>>> connecting Debezium to Flink? Is there an open source Debezium 
>>>>>>>> connector
>>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out 
>>>>>>>> for the
>>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 
>>>>>>>> 1.10.0
>>>>>>>> api?
>>>>>>>>
>>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>>> from Postgres and possibly MySQL.
>>>>>>>>
>>>>>>>> Appreciate any help, thanks!
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to