Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings
before downgrading to 1.10 and figuring out how to replace the Debezium
connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
//
+-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
//
+-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
//
+-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <r...@remind101.com> wrote:

> Thanks!
>
> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi,
>>
>> Regarding the performance difference, the proposed way will have one more
>> stateful operator (deduplication) than the native 1.11 cdc support.
>> The overhead of the deduplication operator is just similar to a simple
>> group by aggregate (max on each non-key column).
>>
>> Best,
>> Jark
>>
>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley <r...@remind101.com> wrote:
>>
>>> Thank you so much for the help!
>>>
>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <ma...@ververica.com>
>>> wrote:
>>>
>>>> Yes — you'll get the full row in the payload; and you can also access
>>>> the change operation, which might be useful in your case.
>>>>
>>>> About performance, I'm summoning Kurt and @Jark Wu <j...@apache.org> to
>>>> the thread, who will be able to give you a more complete answer and likely
>>>> also some optimization tips for your specific use case.
>>>>
>>>> Marta
>>>>
>>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Yup! This definitely helps and makes sense.
>>>>>
>>>>> The 'after' payload comes with all data from the row right? So
>>>>> essentially inserts and updates I can insert/replace data by pk and null
>>>>> values I just delete by pk, and then I can build out the rest of my joins
>>>>> like normal.
>>>>>
>>>>> Are there any performance implications of doing it this way that is
>>>>> different from the out-of-the-box 1.11 solution?
>>>>>
>>>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>>>>> ma...@ververica.com> wrote:
>>>>>
>>>>>> Hi, Rex.
>>>>>>
>>>>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>>>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>>>>> [2], which allows to emit bounded/unbounded streams with insert, update 
>>>>>> and
>>>>>> delete rows.
>>>>>>
>>>>>> In theory, you could consume data generated with Debezium as regular
>>>>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient 
>>>>>> way
>>>>>> to really treat it as "changelog". As a workaround, what you can do in
>>>>>> Flink 1.10 is process these messages as JSON and extract the "after" 
>>>>>> field
>>>>>> from the payload, and then apply de-duplication [3] to keep only the last
>>>>>> row.
>>>>>>
>>>>>> The DDL for your source table would look something like:
>>>>>>
>>>>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>>>>>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>>>>>> 'json', ... );
>>>>>> Hope this helps!
>>>>>>
>>>>>> Marta
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <ches...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>>>>>
>>>>>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>>>>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
>>>>>>> arrived
>>>>>>> in Flink 1.11.0, from looking at the documentation.
>>>>>>>
>>>>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>>>>>
>>>>>>> I'm wondering what alternative solutions are available for
>>>>>>> connecting Debezium to Flink? Is there an open source Debezium connector
>>>>>>> that works with Flink 1.10.0? Could I potentially pull the code out for 
>>>>>>> the
>>>>>>> 1.11.0 Debezium connector and compile it in my project using Flink 
>>>>>>> 1.10.0
>>>>>>> api?
>>>>>>>
>>>>>>> For context, I plan on doing some fairly complicated long lived
>>>>>>> stateful joins / materialization using the Table API over data ingested
>>>>>>> from Postgres and possibly MySQL.
>>>>>>>
>>>>>>> Appreciate any help, thanks!
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to