Btw, this is what our source and sink essentially look like, with some
columns redacted.

CREATE TABLE source_kafka_data (
    id BIGINT,
    roles ARRAY<STRING NOT NULL>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'kafka',
    'properties.group.id' = 'group_id',
    'properties.auto.offset.reset' = 'earliest',
    'debezium-json.schema-include' = 'true',
    'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
    id BIGINT NOT NULL,
    roles ARRAY<STRING>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'eshost',
    'index' = 'data',
    'format' = 'json',
    'sink.bulk-flush.max-actions' = '8192',
    'sink.bulk-flush.max-size' = '16mb',
    'sink.bulk-flush.interval' = '5000',
    'sink.bulk-flush.backoff.delay' = '1000',
    'sink.bulk-flush.backoff.max-retries' = '4',
    'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <r...@remind101.com> wrote:

> Thanks!
>
> Update: We've confirmed with a test copy of our data now that if we remove
> all the null values from arrays everything works smoothly and as expected.
> So this definitely appears to be the culprit.
>
> On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Thanks Rex! This is very helpful. Will check it out later.
>>
>>
>> On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote:
>>
>>> Below is a highly redacted set of data that should represent the
>>> problem. As you can see, the "roles" field has "[null]" in it, a null value
>>> within the array. We also see in our DB corresponding rows like the
>>> following.
>>>     id     | roles
>>> -----------+------------
>>>   16867433 | {NULL}
>>>
>>> We have confirmed that by not selecting "roles" all data passes through
>>> without failure on a single operator, but selecting "roles" will eventually
>>> always fail with java.lang.NullPointerException repeatedly. What is odd
>>> about this is there is 0 additional stack trace, just the exception, in our
>>> logs and in Flink UI. We only have INFO logging on, however, other
>>> exceptions we've encountered in our development have always revealed a
>>> stack trace.
>>>
>>> {
>>>   "schema": {
>>>     "type": "struct",
>>>     "fields": [
>>>       {
>>>         "type": "struct",
>>>         "fields": [
>>>           { "type": "int32", "optional": false, "field": "id" },
>>>           {
>>>             "type": "array",
>>>             "items": { "type": "string", "optional": true },
>>>             "optional": false,
>>>             "field": "roles"
>>>           },
>>>         ],
>>>         "optional": true,
>>>         "name": "db.public.data.Value",
>>>         "field": "before"
>>>       },
>>>       {
>>>         "type": "struct",
>>>         "fields": [
>>>           { "type": "int32", "optional": false, "field": "id" },
>>>           {
>>>             "type": "array",
>>>             "items": { "type": "string", "optional": true },
>>>             "optional": false,
>>>             "field": "roles"
>>>           },
>>>         ],
>>>         "optional": true,
>>>         "name": "db.public.data.Value",
>>>         "field": "after"
>>>       },
>>>       {
>>>         "type": "struct",
>>>         "fields": [
>>>           { "type": "string", "optional": false, "field": "version" },
>>>           { "type": "string", "optional": false, "field": "connector" },
>>>           { "type": "string", "optional": false, "field": "name" },
>>>           { "type": "int64", "optional": false, "field": "ts_ms" },
>>>           {
>>>             "type": "string",
>>>             "optional": true,
>>>             "name": "io.debezium.data.Enum",
>>>             "version": 1,
>>>             "parameters": { "allowed": "true,last,false" },
>>>             "default": "false",
>>>             "field": "snapshot"
>>>           },
>>>           { "type": "string", "optional": false, "field": "db" },
>>>           { "type": "string", "optional": false, "field": "schema" },
>>>           { "type": "string", "optional": false, "field": "table" },
>>>           { "type": "int64", "optional": true, "field": "txId" },
>>>           { "type": "int64", "optional": true, "field": "lsn" },
>>>           { "type": "int64", "optional": true, "field": "xmin" }
>>>         ],
>>>         "optional": false,
>>>         "name": "io.debezium.connector.postgresql.Source",
>>>         "field": "source"
>>>       },
>>>       { "type": "string", "optional": false, "field": "op" },
>>>       { "type": "int64", "optional": true, "field": "ts_ms" },
>>>       {
>>>         "type": "struct",
>>>         "fields": [
>>>           { "type": "string", "optional": false, "field": "id" },
>>>           { "type": "int64", "optional": false, "field": "total_order" },
>>>           {
>>>             "type": "int64",
>>>             "optional": false,
>>>             "field": "data_collection_order"
>>>           }
>>>         ],
>>>         "optional": true,
>>>         "field": "transaction"
>>>       }
>>>     ],
>>>     "optional": false,
>>>     "name": "db.public.data.Envelope"
>>>   },
>>>   "payload": {
>>>     "before": null,
>>>     "after": {
>>>       "id": 76704,
>>>       "roles": [null],
>>>     },
>>>     "source": {
>>>       "version": "1.3.0.Final",
>>>       "connector": "postgresql",
>>>       "name": "db",
>>>       "ts_ms": 1605739197360,
>>>       "snapshot": "true",
>>>       "db": "db",
>>>       "schema": "public",
>>>       "table": "data",
>>>       "txId": 1784,
>>>       "lsn": 1305806608,
>>>       "xmin": null
>>>     },
>>>     "op": "r",
>>>     "ts_ms": 1605739197373,
>>>     "transaction": null
>>>   }
>>> }
>>>
>>> cc Brad
>>>
>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote:
>>>
>>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>>>> definitely was solely in the postgres-specific code.
>>>>
>>>>
>>>>
>>>> Dylan
>>>>
>>>>
>>>>
>>>> *From: *Jark Wu <imj...@gmail.com>
>>>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>>>> *To: *Dylan Forciea <dy...@oseberg.io>
>>>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>,
>>>> Flink ML <user@flink.apache.org>
>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>
>>>>
>>>>
>>>> Hi Dylan,
>>>>
>>>>
>>>>
>>>> I think Rex encountered another issue, because he is using Kafka with
>>>> Debezium format.
>>>>
>>>>
>>>>
>>>> Hi Rex,
>>>>
>>>>
>>>>
>>>> If you can share the json data and the exception stack, that would be
>>>> helpful!
>>>>
>>>>
>>>>
>>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors'
>>>> option [1] to skip the dirty data.
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Jark
>>>>
>>>>
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>>
>>>>
>>>>
>>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote:
>>>>
>>>> Do you mean that the array contains values that are null, or that the
>>>> entire array itself is null? If it’s the latter, I have an issue written,
>>>> along with a PR to fix it that has been pending review [1].
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Dylan Forciea
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-19771
>>>>
>>>>
>>>>
>>>> *From: *Danny Chan <danny0...@apache.org>
>>>> *Date: *Thursday, November 19, 2020 at 2:24 AM
>>>> *To: *Rex Fenley <r...@remind101.com>
>>>> *Cc: *Flink ML <user@flink.apache.org>
>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>
>>>>
>>>>
>>>> Hi, Fenley ~
>>>>
>>>>
>>>>
>>>> You are right, parsing nulls of ARRAY field is not supported now, i
>>>> have logged an issue [1] and would fix it soon ~
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20234
>>>>
>>>>
>>>>
>>>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I recently discovered some of our data has NULL values arriving in an
>>>> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
>>>> connector Debezium format. We seem to be receiving NullPointerExceptions
>>>> for when these NULL values in the arrays arrive which restarts the source
>>>> operator in a loop.
>>>>
>>>>
>>>>
>>>> Is there any way to not throw or to possibly filter out NULLs in an
>>>> Array of Strings in Flink?
>>>>
>>>>
>>>>
>>>> We're somewhat stuck on how to solve this problem, we'd like to be
>>>> defensive about this on Flink's side.
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> (P.S. The exception was not that informative, there may be room for
>>>> improvement in terms of a richer error message when this happens.)
>>>>
>>>>
>>>> --
>>>>
>>>> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>>
>>>> *Remind.com* <https://www.remind.com/> |  BLOG
>>>> <http://blog.remind.com/>  |  FOLLOW US <https://twitter.com/remindhq>
>>>>  |  *LIKE US <https://www.facebook.com/remindhq>*
>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to