My local test indicates that the debezium-json works correctly with your
given schema and example record, can you give more detailed exception stack
trace and a record that can reproduce this problem ?

Rex Fenley <r...@remind101.com> 于2020年12月1日周二 上午7:15写道:

> Hello,
>
> Any updates on this bug?
>
> Thanks!
>
> On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <r...@remind101.com> wrote:
>
>> Btw, this is what our source and sink essentially look like, with some
>> columns redacted.
>>
>> CREATE TABLE source_kafka_data (
>>     id BIGINT,
>>     roles ARRAY<STRING NOT NULL>,
>>     PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'topic',
>>     'properties.bootstrap.servers' = 'kafka',
>>     'properties.group.id' = 'group_id',
>>     'properties.auto.offset.reset' = 'earliest',
>>     'debezium-json.schema-include' = 'true',
>>     'format' = 'debezium-json'
>> )
>>
>>
>> CREATE TABLE sink_es_data (
>>     id BIGINT NOT NULL,
>>     roles ARRAY<STRING>,
>>     PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>>     'connector' = 'elasticsearch-7',
>>     'hosts' = 'eshost',
>>     'index' = 'data',
>>     'format' = 'json',
>>     'sink.bulk-flush.max-actions' = '8192',
>>     'sink.bulk-flush.max-size' = '16mb',
>>     'sink.bulk-flush.interval' = '5000',
>>     'sink.bulk-flush.backoff.delay' = '1000',
>>     'sink.bulk-flush.backoff.max-retries' = '4',
>>     'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>> )
>>
>>
>>
>> On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> Thanks!
>>>
>>> Update: We've confirmed with a test copy of our data now that if we
>>> remove all the null values from arrays everything works smoothly and as
>>> expected. So this definitely appears to be the culprit.
>>>
>>> On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Thanks Rex! This is very helpful. Will check it out later.
>>>>
>>>>
>>>> On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Below is a highly redacted set of data that should represent the
>>>>> problem. As you can see, the "roles" field has "[null]" in it, a null 
>>>>> value
>>>>> within the array. We also see in our DB corresponding rows like the
>>>>> following.
>>>>>     id     | roles
>>>>> -----------+------------
>>>>>   16867433 | {NULL}
>>>>>
>>>>> We have confirmed that by not selecting "roles" all data passes
>>>>> through without failure on a single operator, but selecting "roles" will
>>>>> eventually always fail with java.lang.NullPointerException
>>>>> repeatedly. What is odd about this is there is 0 additional stack trace,
>>>>> just the exception, in our logs and in Flink UI. We only have INFO logging
>>>>> on, however, other exceptions we've encountered in our development have
>>>>> always revealed a stack trace.
>>>>>
>>>>> {
>>>>>   "schema": {
>>>>>     "type": "struct",
>>>>>     "fields": [
>>>>>       {
>>>>>         "type": "struct",
>>>>>         "fields": [
>>>>>           { "type": "int32", "optional": false, "field": "id" },
>>>>>           {
>>>>>             "type": "array",
>>>>>             "items": { "type": "string", "optional": true },
>>>>>             "optional": false,
>>>>>             "field": "roles"
>>>>>           },
>>>>>         ],
>>>>>         "optional": true,
>>>>>         "name": "db.public.data.Value",
>>>>>         "field": "before"
>>>>>       },
>>>>>       {
>>>>>         "type": "struct",
>>>>>         "fields": [
>>>>>           { "type": "int32", "optional": false, "field": "id" },
>>>>>           {
>>>>>             "type": "array",
>>>>>             "items": { "type": "string", "optional": true },
>>>>>             "optional": false,
>>>>>             "field": "roles"
>>>>>           },
>>>>>         ],
>>>>>         "optional": true,
>>>>>         "name": "db.public.data.Value",
>>>>>         "field": "after"
>>>>>       },
>>>>>       {
>>>>>         "type": "struct",
>>>>>         "fields": [
>>>>>           { "type": "string", "optional": false, "field": "version" },
>>>>>           { "type": "string", "optional": false, "field": "connector"
>>>>> },
>>>>>           { "type": "string", "optional": false, "field": "name" },
>>>>>           { "type": "int64", "optional": false, "field": "ts_ms" },
>>>>>           {
>>>>>             "type": "string",
>>>>>             "optional": true,
>>>>>             "name": "io.debezium.data.Enum",
>>>>>             "version": 1,
>>>>>             "parameters": { "allowed": "true,last,false" },
>>>>>             "default": "false",
>>>>>             "field": "snapshot"
>>>>>           },
>>>>>           { "type": "string", "optional": false, "field": "db" },
>>>>>           { "type": "string", "optional": false, "field": "schema" },
>>>>>           { "type": "string", "optional": false, "field": "table" },
>>>>>           { "type": "int64", "optional": true, "field": "txId" },
>>>>>           { "type": "int64", "optional": true, "field": "lsn" },
>>>>>           { "type": "int64", "optional": true, "field": "xmin" }
>>>>>         ],
>>>>>         "optional": false,
>>>>>         "name": "io.debezium.connector.postgresql.Source",
>>>>>         "field": "source"
>>>>>       },
>>>>>       { "type": "string", "optional": false, "field": "op" },
>>>>>       { "type": "int64", "optional": true, "field": "ts_ms" },
>>>>>       {
>>>>>         "type": "struct",
>>>>>         "fields": [
>>>>>           { "type": "string", "optional": false, "field": "id" },
>>>>>           { "type": "int64", "optional": false, "field": "total_order"
>>>>> },
>>>>>           {
>>>>>             "type": "int64",
>>>>>             "optional": false,
>>>>>             "field": "data_collection_order"
>>>>>           }
>>>>>         ],
>>>>>         "optional": true,
>>>>>         "field": "transaction"
>>>>>       }
>>>>>     ],
>>>>>     "optional": false,
>>>>>     "name": "db.public.data.Envelope"
>>>>>   },
>>>>>   "payload": {
>>>>>     "before": null,
>>>>>     "after": {
>>>>>       "id": 76704,
>>>>>       "roles": [null],
>>>>>     },
>>>>>     "source": {
>>>>>       "version": "1.3.0.Final",
>>>>>       "connector": "postgresql",
>>>>>       "name": "db",
>>>>>       "ts_ms": 1605739197360,
>>>>>       "snapshot": "true",
>>>>>       "db": "db",
>>>>>       "schema": "public",
>>>>>       "table": "data",
>>>>>       "txId": 1784,
>>>>>       "lsn": 1305806608,
>>>>>       "xmin": null
>>>>>     },
>>>>>     "op": "r",
>>>>>     "ts_ms": 1605739197373,
>>>>>     "transaction": null
>>>>>   }
>>>>> }
>>>>>
>>>>> cc Brad
>>>>>
>>>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io>
>>>>> wrote:
>>>>>
>>>>>> Ah yes, missed the kafka part and just saw the array part.
>>>>>> FLINK-19771 definitely was solely in the postgres-specific code.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dylan
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Jark Wu <imj...@gmail.com>
>>>>>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>>>>>> *To: *Dylan Forciea <dy...@oseberg.io>
>>>>>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <
>>>>>> r...@remind101.com>, Flink ML <user@flink.apache.org>
>>>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Dylan,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I think Rex encountered another issue, because he is using Kafka with
>>>>>> Debezium format.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Rex,
>>>>>>
>>>>>>
>>>>>>
>>>>>> If you can share the json data and the exception stack, that would be
>>>>>> helpful!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors'
>>>>>> option [1] to skip the dirty data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Jark
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1]:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote:
>>>>>>
>>>>>> Do you mean that the array contains values that are null, or that the
>>>>>> entire array itself is null? If it’s the latter, I have an issue written,
>>>>>> along with a PR to fix it that has been pending review [1].
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Dylan Forciea
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-19771
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Danny Chan <danny0...@apache.org>
>>>>>> *Date: *Thursday, November 19, 2020 at 2:24 AM
>>>>>> *To: *Rex Fenley <r...@remind101.com>
>>>>>> *Cc: *Flink ML <user@flink.apache.org>
>>>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi, Fenley ~
>>>>>>
>>>>>>
>>>>>>
>>>>>> You are right, parsing nulls of ARRAY field is not supported now, i
>>>>>> have logged an issue [1] and would fix it soon ~
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20234
>>>>>>
>>>>>>
>>>>>>
>>>>>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I recently discovered some of our data has NULL values arriving in an
>>>>>> ARRAY<STRING> column. This column is being consumed by Flink via the 
>>>>>> Kafka
>>>>>> connector Debezium format. We seem to be receiving NullPointerExceptions
>>>>>> for when these NULL values in the arrays arrive which restarts the source
>>>>>> operator in a loop.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there any way to not throw or to possibly filter out NULLs in an
>>>>>> Array of Strings in Flink?
>>>>>>
>>>>>>
>>>>>>
>>>>>> We're somewhat stuck on how to solve this problem, we'd like to be
>>>>>> defensive about this on Flink's side.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>> (P.S. The exception was not that informative, there may be room for
>>>>>> improvement in terms of a richer error message when this happens.)
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Remind.com* <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  *LIKE US
>>>>>> <https://www.facebook.com/remindhq>*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to