Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following. id | roles -----------+------------ 16867433 | {NULL}
We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace. { "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "array", "items": { "type": "string", "optional": true }, "optional": false, "field": "roles" }, ], "optional": true, "name": "db.public.data.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "array", "items": { "type": "string", "optional": true }, "optional": false, "field": "roles" }, ], "optional": true, "name": "db.public.data.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "db.public.data.Envelope" }, "payload": { "before": null, "after": { "id": 76704, "roles": [null], }, "source": { "version": "1.3.0.Final", "connector": "postgresql", "name": "db", "ts_ms": 1605739197360, "snapshot": "true", "db": "db", "schema": "public", "table": "data", "txId": 1784, "lsn": 1305806608, "xmin": null }, "op": "r", "ts_ms": 1605739197373, "transaction": null } } cc Brad On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote: > Ah yes, missed the kafka part and just saw the array part. FLINK-19771 > definitely was solely in the postgres-specific code. > > > > Dylan > > > > *From: *Jark Wu <imj...@gmail.com> > *Date: *Thursday, November 19, 2020 at 9:12 AM > *To: *Dylan Forciea <dy...@oseberg.io> > *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>, > Flink ML <user@flink.apache.org> > *Subject: *Re: Filter Null in Array in SQL Connector > > > > Hi Dylan, > > > > I think Rex encountered another issue, because he is using Kafka with > Debezium format. > > > > Hi Rex, > > > > If you can share the json data and the exception stack, that would be > helpful! > > > > Besides, you can try to enable 'debezium-json.ignore-parse-errors' option > [1] to skip the dirty data. > > > > Best, > > Jark > > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors > > > > On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote: > > Do you mean that the array contains values that are null, or that the > entire array itself is null? If it’s the latter, I have an issue written, > along with a PR to fix it that has been pending review [1]. > > > > Regards, > > Dylan Forciea > > > > [1] https://issues.apache.org/jira/browse/FLINK-19771 > > > > *From: *Danny Chan <danny0...@apache.org> > *Date: *Thursday, November 19, 2020 at 2:24 AM > *To: *Rex Fenley <r...@remind101.com> > *Cc: *Flink ML <user@flink.apache.org> > *Subject: *Re: Filter Null in Array in SQL Connector > > > > Hi, Fenley ~ > > > > You are right, parsing nulls of ARRAY field is not supported now, i have > logged an issue [1] and would fix it soon ~ > > > > [1] https://issues.apache.org/jira/browse/FLINK-20234 > > > > Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道: > > Hi, > > > > I recently discovered some of our data has NULL values arriving in an > ARRAY<STRING> column. This column is being consumed by Flink via the Kafka > connector Debezium format. We seem to be receiving NullPointerExceptions > for when these NULL values in the arrays arrive which restarts the source > operator in a loop. > > > > Is there any way to not throw or to possibly filter out NULLs in an Array > of Strings in Flink? > > > > We're somewhat stuck on how to solve this problem, we'd like to be > defensive about this on Flink's side. > > > > Thanks! > > > > (P.S. The exception was not that informative, there may be room for > improvement in terms of a richer error message when this happens.) > > > -- > > *Rex Fenley* | Software Engineer - Mobile and Backend > > > > *Remind.com* <https://www.remind.com/> | BLOG <http://blog.remind.com/> > | FOLLOW US <https://twitter.com/remindhq> | *LIKE US > <https://www.facebook.com/remindhq>* > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>