I checked with the following json:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"array",
"items":{
"type":"string",
"optional":true
},
"optional":false,
"field":"roles"
}
],
"optional":true,
"name":"db.public.data.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"array",
"items":{
"type":"string",
"optional":true
},
"optional":false,
"field":"roles"
}
],
"optional":true,
"name":"db.public.data.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"db.public.data.Envelope"
},
"payload":{
"before":null,
"after":{
"id":76704,
"roles":[
null
]
},
"source":{
"version":"1.3.0.Final",
"connector":"postgresql",
"name":"db",
"ts_ms":1605739197360,
"snapshot":"true",
"db":"db",
"schema":"public",
"table":"data",
"txId":1784,
"lsn":1305806608,
"xmin":null
},
"op":"r",
"ts_ms":1605739197373,
"transaction":null
}
}
Which works correctly. I reformatted it because it is with invalid JSON
format.
Rex Fenley <[email protected]> 于2020年11月20日周五 上午3:02写道:
> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id | roles
> -----------+------------
> 16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
> "schema": {
> "type": "struct",
> "fields": [
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "version" },
> { "type": "string", "optional": false, "field": "connector" },
> { "type": "string", "optional": false, "field": "name" },
> { "type": "int64", "optional": false, "field": "ts_ms" },
> {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
> },
> { "type": "string", "optional": false, "field": "db" },
> { "type": "string", "optional": false, "field": "schema" },
> { "type": "string", "optional": false, "field": "table" },
> { "type": "int64", "optional": true, "field": "txId" },
> { "type": "int64", "optional": true, "field": "lsn" },
> { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
> },
> { "type": "string", "optional": false, "field": "op" },
> { "type": "int64", "optional": true, "field": "ts_ms" },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "id" },
> { "type": "int64", "optional": false, "field": "total_order" },
> {
> "type": "int64",
> "optional": false,
> "field": "data_collection_order"
> }
> ],
> "optional": true,
> "field": "transaction"
> }
> ],
> "optional": false,
> "name": "db.public.data.Envelope"
> },
> "payload": {
> "before": null,
> "after": {
> "id": 76704,
> "roles": [null],
> },
> "source": {
> "version": "1.3.0.Final",
> "connector": "postgresql",
> "name": "db",
> "ts_ms": 1605739197360,
> "snapshot": "true",
> "db": "db",
> "schema": "public",
> "table": "data",
> "txId": 1784,
> "lsn": 1305806608,
> "xmin": null
> },
> "op": "r",
> "ts_ms": 1605739197373,
> "transaction": null
> }
> }
>
> cc Brad
>
> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[email protected]> wrote:
>
>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>> definitely was solely in the postgres-specific code.
>>
>>
>>
>> Dylan
>>
>>
>>
>> *From: *Jark Wu <[email protected]>
>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>> *To: *Dylan Forciea <[email protected]>
>> *Cc: *Danny Chan <[email protected]>, Rex Fenley <[email protected]>,
>> Flink ML <[email protected]>
>> *Subject: *Re: Filter Null in Array in SQL Connector
>>
>>
>>
>> Hi Dylan,
>>
>>
>>
>> I think Rex encountered another issue, because he is using Kafka with
>> Debezium format.
>>
>>
>>
>> Hi Rex,
>>
>>
>>
>> If you can share the json data and the exception stack, that would be
>> helpful!
>>
>>
>>
>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
>> [1] to skip the dirty data.
>>
>>
>>
>> Best,
>>
>> Jark
>>
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>
>>
>>
>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[email protected]> wrote:
>>
>> Do you mean that the array contains values that are null, or that the
>> entire array itself is null? If it’s the latter, I have an issue written,
>> along with a PR to fix it that has been pending review [1].
>>
>>
>>
>> Regards,
>>
>> Dylan Forciea
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19771
>>
>>
>>
>> *From: *Danny Chan <[email protected]>
>> *Date: *Thursday, November 19, 2020 at 2:24 AM
>> *To: *Rex Fenley <[email protected]>
>> *Cc: *Flink ML <[email protected]>
>> *Subject: *Re: Filter Null in Array in SQL Connector
>>
>>
>>
>> Hi, Fenley ~
>>
>>
>>
>> You are right, parsing nulls of ARRAY field is not supported now, i have
>> logged an issue [1] and would fix it soon ~
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20234
>>
>>
>>
>> Rex Fenley <[email protected]> 于2020年11月19日周四 下午2:51写道:
>>
>> Hi,
>>
>>
>>
>> I recently discovered some of our data has NULL values arriving in an
>> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
>> connector Debezium format. We seem to be receiving NullPointerExceptions
>> for when these NULL values in the arrays arrive which restarts the source
>> operator in a loop.
>>
>>
>>
>> Is there any way to not throw or to possibly filter out NULLs in an Array
>> of Strings in Flink?
>>
>>
>>
>> We're somewhat stuck on how to solve this problem, we'd like to be
>> defensive about this on Flink's side.
>>
>>
>>
>> Thanks!
>>
>>
>>
>> (P.S. The exception was not that informative, there may be room for
>> improvement in terms of a richer error message when this happens.)
>>
>>
>> --
>>
>> *Rex Fenley* | Software Engineer - Mobile and Backend
>>
>>
>>
>> *Remind.com* <https://www.remind.com/> | BLOG <http://blog.remind.com/>
>> | FOLLOW US <https://twitter.com/remindhq> | *LIKE US
>> <https://www.facebook.com/remindhq>*
>>
>>
>
> --
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>