My local test indicates that the debezium-json works correctly with your given schema and example record, can you give more detailed exception stack trace and a record that can reproduce this problem ?
Rex Fenley <r...@remind101.com> 于2020年12月1日周二 上午7:15写道: > Hello, > > Any updates on this bug? > > Thanks! > > On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <r...@remind101.com> wrote: > >> Btw, this is what our source and sink essentially look like, with some >> columns redacted. >> >> CREATE TABLE source_kafka_data ( >> id BIGINT, >> roles ARRAY<STRING NOT NULL>, >> PRIMARY KEY (id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic', >> 'properties.bootstrap.servers' = 'kafka', >> 'properties.group.id' = 'group_id', >> 'properties.auto.offset.reset' = 'earliest', >> 'debezium-json.schema-include' = 'true', >> 'format' = 'debezium-json' >> ) >> >> >> CREATE TABLE sink_es_data ( >> id BIGINT NOT NULL, >> roles ARRAY<STRING>, >> PRIMARY KEY (id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = 'eshost', >> 'index' = 'data', >> 'format' = 'json', >> 'sink.bulk-flush.max-actions' = '8192', >> 'sink.bulk-flush.max-size' = '16mb', >> 'sink.bulk-flush.interval' = '5000', >> 'sink.bulk-flush.backoff.delay' = '1000', >> 'sink.bulk-flush.backoff.max-retries' = '4', >> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' >> ) >> >> >> >> On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Thanks! >>> >>> Update: We've confirmed with a test copy of our data now that if we >>> remove all the null values from arrays everything works smoothly and as >>> expected. So this definitely appears to be the culprit. >>> >>> On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Thanks Rex! This is very helpful. Will check it out later. >>>> >>>> >>>> On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Below is a highly redacted set of data that should represent the >>>>> problem. As you can see, the "roles" field has "[null]" in it, a null >>>>> value >>>>> within the array. We also see in our DB corresponding rows like the >>>>> following. >>>>> id | roles >>>>> -----------+------------ >>>>> 16867433 | {NULL} >>>>> >>>>> We have confirmed that by not selecting "roles" all data passes >>>>> through without failure on a single operator, but selecting "roles" will >>>>> eventually always fail with java.lang.NullPointerException >>>>> repeatedly. What is odd about this is there is 0 additional stack trace, >>>>> just the exception, in our logs and in Flink UI. We only have INFO logging >>>>> on, however, other exceptions we've encountered in our development have >>>>> always revealed a stack trace. >>>>> >>>>> { >>>>> "schema": { >>>>> "type": "struct", >>>>> "fields": [ >>>>> { >>>>> "type": "struct", >>>>> "fields": [ >>>>> { "type": "int32", "optional": false, "field": "id" }, >>>>> { >>>>> "type": "array", >>>>> "items": { "type": "string", "optional": true }, >>>>> "optional": false, >>>>> "field": "roles" >>>>> }, >>>>> ], >>>>> "optional": true, >>>>> "name": "db.public.data.Value", >>>>> "field": "before" >>>>> }, >>>>> { >>>>> "type": "struct", >>>>> "fields": [ >>>>> { "type": "int32", "optional": false, "field": "id" }, >>>>> { >>>>> "type": "array", >>>>> "items": { "type": "string", "optional": true }, >>>>> "optional": false, >>>>> "field": "roles" >>>>> }, >>>>> ], >>>>> "optional": true, >>>>> "name": "db.public.data.Value", >>>>> "field": "after" >>>>> }, >>>>> { >>>>> "type": "struct", >>>>> "fields": [ >>>>> { "type": "string", "optional": false, "field": "version" }, >>>>> { "type": "string", "optional": false, "field": "connector" >>>>> }, >>>>> { "type": "string", "optional": false, "field": "name" }, >>>>> { "type": "int64", "optional": false, "field": "ts_ms" }, >>>>> { >>>>> "type": "string", >>>>> "optional": true, >>>>> "name": "io.debezium.data.Enum", >>>>> "version": 1, >>>>> "parameters": { "allowed": "true,last,false" }, >>>>> "default": "false", >>>>> "field": "snapshot" >>>>> }, >>>>> { "type": "string", "optional": false, "field": "db" }, >>>>> { "type": "string", "optional": false, "field": "schema" }, >>>>> { "type": "string", "optional": false, "field": "table" }, >>>>> { "type": "int64", "optional": true, "field": "txId" }, >>>>> { "type": "int64", "optional": true, "field": "lsn" }, >>>>> { "type": "int64", "optional": true, "field": "xmin" } >>>>> ], >>>>> "optional": false, >>>>> "name": "io.debezium.connector.postgresql.Source", >>>>> "field": "source" >>>>> }, >>>>> { "type": "string", "optional": false, "field": "op" }, >>>>> { "type": "int64", "optional": true, "field": "ts_ms" }, >>>>> { >>>>> "type": "struct", >>>>> "fields": [ >>>>> { "type": "string", "optional": false, "field": "id" }, >>>>> { "type": "int64", "optional": false, "field": "total_order" >>>>> }, >>>>> { >>>>> "type": "int64", >>>>> "optional": false, >>>>> "field": "data_collection_order" >>>>> } >>>>> ], >>>>> "optional": true, >>>>> "field": "transaction" >>>>> } >>>>> ], >>>>> "optional": false, >>>>> "name": "db.public.data.Envelope" >>>>> }, >>>>> "payload": { >>>>> "before": null, >>>>> "after": { >>>>> "id": 76704, >>>>> "roles": [null], >>>>> }, >>>>> "source": { >>>>> "version": "1.3.0.Final", >>>>> "connector": "postgresql", >>>>> "name": "db", >>>>> "ts_ms": 1605739197360, >>>>> "snapshot": "true", >>>>> "db": "db", >>>>> "schema": "public", >>>>> "table": "data", >>>>> "txId": 1784, >>>>> "lsn": 1305806608, >>>>> "xmin": null >>>>> }, >>>>> "op": "r", >>>>> "ts_ms": 1605739197373, >>>>> "transaction": null >>>>> } >>>>> } >>>>> >>>>> cc Brad >>>>> >>>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> >>>>> wrote: >>>>> >>>>>> Ah yes, missed the kafka part and just saw the array part. >>>>>> FLINK-19771 definitely was solely in the postgres-specific code. >>>>>> >>>>>> >>>>>> >>>>>> Dylan >>>>>> >>>>>> >>>>>> >>>>>> *From: *Jark Wu <imj...@gmail.com> >>>>>> *Date: *Thursday, November 19, 2020 at 9:12 AM >>>>>> *To: *Dylan Forciea <dy...@oseberg.io> >>>>>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley < >>>>>> r...@remind101.com>, Flink ML <user@flink.apache.org> >>>>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>>>> >>>>>> >>>>>> >>>>>> Hi Dylan, >>>>>> >>>>>> >>>>>> >>>>>> I think Rex encountered another issue, because he is using Kafka with >>>>>> Debezium format. >>>>>> >>>>>> >>>>>> >>>>>> Hi Rex, >>>>>> >>>>>> >>>>>> >>>>>> If you can share the json data and the exception stack, that would be >>>>>> helpful! >>>>>> >>>>>> >>>>>> >>>>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' >>>>>> option [1] to skip the dirty data. >>>>>> >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Jark >>>>>> >>>>>> >>>>>> >>>>>> [1]: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >>>>>> >>>>>> >>>>>> >>>>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote: >>>>>> >>>>>> Do you mean that the array contains values that are null, or that the >>>>>> entire array itself is null? If it’s the latter, I have an issue written, >>>>>> along with a PR to fix it that has been pending review [1]. >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Dylan Forciea >>>>>> >>>>>> >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-19771 >>>>>> >>>>>> >>>>>> >>>>>> *From: *Danny Chan <danny0...@apache.org> >>>>>> *Date: *Thursday, November 19, 2020 at 2:24 AM >>>>>> *To: *Rex Fenley <r...@remind101.com> >>>>>> *Cc: *Flink ML <user@flink.apache.org> >>>>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>>>> >>>>>> >>>>>> >>>>>> Hi, Fenley ~ >>>>>> >>>>>> >>>>>> >>>>>> You are right, parsing nulls of ARRAY field is not supported now, i >>>>>> have logged an issue [1] and would fix it soon ~ >>>>>> >>>>>> >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20234 >>>>>> >>>>>> >>>>>> >>>>>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道: >>>>>> >>>>>> Hi, >>>>>> >>>>>> >>>>>> >>>>>> I recently discovered some of our data has NULL values arriving in an >>>>>> ARRAY<STRING> column. This column is being consumed by Flink via the >>>>>> Kafka >>>>>> connector Debezium format. We seem to be receiving NullPointerExceptions >>>>>> for when these NULL values in the arrays arrive which restarts the source >>>>>> operator in a loop. >>>>>> >>>>>> >>>>>> >>>>>> Is there any way to not throw or to possibly filter out NULLs in an >>>>>> Array of Strings in Flink? >>>>>> >>>>>> >>>>>> >>>>>> We're somewhat stuck on how to solve this problem, we'd like to be >>>>>> defensive about this on Flink's side. >>>>>> >>>>>> >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>>> (P.S. The exception was not that informative, there may be room for >>>>>> improvement in terms of a richer error message when this happens.) >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *Rex Fenley* | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> >>>>>> *Remind.com* <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | *LIKE US >>>>>> <https://www.facebook.com/remindhq>* >>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >