Below is a highly redacted set of data that should represent the problem.
As you can see, the "roles" field has "[null]" in it, a null value within
the array. We also see in our DB corresponding rows like the following.
    id     | roles
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through
without failure on a single operator, but selecting "roles" will eventually
always fail with java.lang.NullPointerException repeatedly. What is odd
about this is there is 0 additional stack trace, just the exception, in our
logs and in Flink UI. We only have INFO logging on, however, other
exceptions we've encountered in our development have always revealed a
stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote:

> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
> definitely was solely in the postgres-specific code.
>
>
>
> Dylan
>
>
>
> *From: *Jark Wu <imj...@gmail.com>
> *Date: *Thursday, November 19, 2020 at 9:12 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>,
> Flink ML <user@flink.apache.org>
> *Subject: *Re: Filter Null in Array in SQL Connector
>
>
>
> Hi Dylan,
>
>
>
> I think Rex encountered another issue, because he is using Kafka with
> Debezium format.
>
>
>
> Hi Rex,
>
>
>
> If you can share the json data and the exception stack, that would be
> helpful!
>
>
>
> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
> [1] to skip the dirty data.
>
>
>
> Best,
>
> Jark
>
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>
>
>
> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote:
>
> Do you mean that the array contains values that are null, or that the
> entire array itself is null? If it’s the latter, I have an issue written,
> along with a PR to fix it that has been pending review [1].
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-19771
>
>
>
> *From: *Danny Chan <danny0...@apache.org>
> *Date: *Thursday, November 19, 2020 at 2:24 AM
> *To: *Rex Fenley <r...@remind101.com>
> *Cc: *Flink ML <user@flink.apache.org>
> *Subject: *Re: Filter Null in Array in SQL Connector
>
>
>
> Hi, Fenley ~
>
>
>
> You are right, parsing nulls of ARRAY field is not supported now, i have
> logged an issue [1] and would fix it soon ~
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20234
>
>
>
> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道:
>
> Hi,
>
>
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
>
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
>
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
>
>
> Thanks!
>
>
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com* <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>  |  FOLLOW US <https://twitter.com/remindhq>  |  *LIKE US
> <https://www.facebook.com/remindhq>*
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to