Btw, this is what our source and sink essentially look like, with some columns redacted.
CREATE TABLE source_kafka_data ( id BIGINT, roles ARRAY<STRING NOT NULL>, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'kafka', 'properties.group.id' = 'group_id', 'properties.auto.offset.reset' = 'earliest', 'debezium-json.schema-include' = 'true', 'format' = 'debezium-json' ) CREATE TABLE sink_es_data ( id BIGINT NOT NULL, roles ARRAY<STRING>, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'eshost', 'index' = 'data', 'format' = 'json', 'sink.bulk-flush.max-actions' = '8192', 'sink.bulk-flush.max-size' = '16mb', 'sink.bulk-flush.interval' = '5000', 'sink.bulk-flush.backoff.delay' = '1000', 'sink.bulk-flush.backoff.max-retries' = '4', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' ) On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <r...@remind101.com> wrote: > Thanks! > > Update: We've confirmed with a test copy of our data now that if we remove > all the null values from arrays everything works smoothly and as expected. > So this definitely appears to be the culprit. > > On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <imj...@gmail.com> wrote: > >> Thanks Rex! This is very helpful. Will check it out later. >> >> >> On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote: >> >>> Below is a highly redacted set of data that should represent the >>> problem. As you can see, the "roles" field has "[null]" in it, a null value >>> within the array. We also see in our DB corresponding rows like the >>> following. >>> id | roles >>> -----------+------------ >>> 16867433 | {NULL} >>> >>> We have confirmed that by not selecting "roles" all data passes through >>> without failure on a single operator, but selecting "roles" will eventually >>> always fail with java.lang.NullPointerException repeatedly. What is odd >>> about this is there is 0 additional stack trace, just the exception, in our >>> logs and in Flink UI. We only have INFO logging on, however, other >>> exceptions we've encountered in our development have always revealed a >>> stack trace. >>> >>> { >>> "schema": { >>> "type": "struct", >>> "fields": [ >>> { >>> "type": "struct", >>> "fields": [ >>> { "type": "int32", "optional": false, "field": "id" }, >>> { >>> "type": "array", >>> "items": { "type": "string", "optional": true }, >>> "optional": false, >>> "field": "roles" >>> }, >>> ], >>> "optional": true, >>> "name": "db.public.data.Value", >>> "field": "before" >>> }, >>> { >>> "type": "struct", >>> "fields": [ >>> { "type": "int32", "optional": false, "field": "id" }, >>> { >>> "type": "array", >>> "items": { "type": "string", "optional": true }, >>> "optional": false, >>> "field": "roles" >>> }, >>> ], >>> "optional": true, >>> "name": "db.public.data.Value", >>> "field": "after" >>> }, >>> { >>> "type": "struct", >>> "fields": [ >>> { "type": "string", "optional": false, "field": "version" }, >>> { "type": "string", "optional": false, "field": "connector" }, >>> { "type": "string", "optional": false, "field": "name" }, >>> { "type": "int64", "optional": false, "field": "ts_ms" }, >>> { >>> "type": "string", >>> "optional": true, >>> "name": "io.debezium.data.Enum", >>> "version": 1, >>> "parameters": { "allowed": "true,last,false" }, >>> "default": "false", >>> "field": "snapshot" >>> }, >>> { "type": "string", "optional": false, "field": "db" }, >>> { "type": "string", "optional": false, "field": "schema" }, >>> { "type": "string", "optional": false, "field": "table" }, >>> { "type": "int64", "optional": true, "field": "txId" }, >>> { "type": "int64", "optional": true, "field": "lsn" }, >>> { "type": "int64", "optional": true, "field": "xmin" } >>> ], >>> "optional": false, >>> "name": "io.debezium.connector.postgresql.Source", >>> "field": "source" >>> }, >>> { "type": "string", "optional": false, "field": "op" }, >>> { "type": "int64", "optional": true, "field": "ts_ms" }, >>> { >>> "type": "struct", >>> "fields": [ >>> { "type": "string", "optional": false, "field": "id" }, >>> { "type": "int64", "optional": false, "field": "total_order" }, >>> { >>> "type": "int64", >>> "optional": false, >>> "field": "data_collection_order" >>> } >>> ], >>> "optional": true, >>> "field": "transaction" >>> } >>> ], >>> "optional": false, >>> "name": "db.public.data.Envelope" >>> }, >>> "payload": { >>> "before": null, >>> "after": { >>> "id": 76704, >>> "roles": [null], >>> }, >>> "source": { >>> "version": "1.3.0.Final", >>> "connector": "postgresql", >>> "name": "db", >>> "ts_ms": 1605739197360, >>> "snapshot": "true", >>> "db": "db", >>> "schema": "public", >>> "table": "data", >>> "txId": 1784, >>> "lsn": 1305806608, >>> "xmin": null >>> }, >>> "op": "r", >>> "ts_ms": 1605739197373, >>> "transaction": null >>> } >>> } >>> >>> cc Brad >>> >>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote: >>> >>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771 >>>> definitely was solely in the postgres-specific code. >>>> >>>> >>>> >>>> Dylan >>>> >>>> >>>> >>>> *From: *Jark Wu <imj...@gmail.com> >>>> *Date: *Thursday, November 19, 2020 at 9:12 AM >>>> *To: *Dylan Forciea <dy...@oseberg.io> >>>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>, >>>> Flink ML <user@flink.apache.org> >>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>> >>>> >>>> >>>> Hi Dylan, >>>> >>>> >>>> >>>> I think Rex encountered another issue, because he is using Kafka with >>>> Debezium format. >>>> >>>> >>>> >>>> Hi Rex, >>>> >>>> >>>> >>>> If you can share the json data and the exception stack, that would be >>>> helpful! >>>> >>>> >>>> >>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' >>>> option [1] to skip the dirty data. >>>> >>>> >>>> >>>> Best, >>>> >>>> Jark >>>> >>>> >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >>>> >>>> >>>> >>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote: >>>> >>>> Do you mean that the array contains values that are null, or that the >>>> entire array itself is null? If it’s the latter, I have an issue written, >>>> along with a PR to fix it that has been pending review [1]. >>>> >>>> >>>> >>>> Regards, >>>> >>>> Dylan Forciea >>>> >>>> >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-19771 >>>> >>>> >>>> >>>> *From: *Danny Chan <danny0...@apache.org> >>>> *Date: *Thursday, November 19, 2020 at 2:24 AM >>>> *To: *Rex Fenley <r...@remind101.com> >>>> *Cc: *Flink ML <user@flink.apache.org> >>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>> >>>> >>>> >>>> Hi, Fenley ~ >>>> >>>> >>>> >>>> You are right, parsing nulls of ARRAY field is not supported now, i >>>> have logged an issue [1] and would fix it soon ~ >>>> >>>> >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-20234 >>>> >>>> >>>> >>>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道: >>>> >>>> Hi, >>>> >>>> >>>> >>>> I recently discovered some of our data has NULL values arriving in an >>>> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka >>>> connector Debezium format. We seem to be receiving NullPointerExceptions >>>> for when these NULL values in the arrays arrive which restarts the source >>>> operator in a loop. >>>> >>>> >>>> >>>> Is there any way to not throw or to possibly filter out NULLs in an >>>> Array of Strings in Flink? >>>> >>>> >>>> >>>> We're somewhat stuck on how to solve this problem, we'd like to be >>>> defensive about this on Flink's side. >>>> >>>> >>>> >>>> Thanks! >>>> >>>> >>>> >>>> (P.S. The exception was not that informative, there may be room for >>>> improvement in terms of a richer error message when this happens.) >>>> >>>> >>>> -- >>>> >>>> *Rex Fenley* | Software Engineer - Mobile and Backend >>>> >>>> >>>> >>>> *Remind.com* <https://www.remind.com/> | BLOG >>>> <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> >>>> | *LIKE US <https://www.facebook.com/remindhq>* >>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>