Hi Laszlo,

It seems because the json format supports object array type and doesn’t support 
list type. 

However, it still hasn’t provided object array type in PyFlink Datastream API 
[1]. I have created a ticket as a following up.

For now, I guess you could implement it yourself and could take a look at the 
basic array [2] as an example.

Regards,
Dian

[1] https://issues.apache.org/jira/browse/FLINK-23033
[2] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/common/typeinfo.py#L243

> 2021年6月17日 下午5:01,László Ciople <ciople.las...@gmail.com> 写道:
> 
> Hello,
> While trying to use the Pyflink DataStream API in Flink 1.13, I have 
> encountered an error regarding list types. I am trying to read data from a 
> Kafka topic that contains events in a json format. For example:
> {
>     "timestamp": 1614259940,
>     "harvesterID": "aws-harvester",
>     "clientID": "aws-client-id",
>     "deviceID": "aws-devid",
>     "payload": {
>         "Version": {
>             "PolicyVersion": {
>                 "Document": {
>                     "Version": "2012-10-17",
>                     "Statement": [
>                         {
>                             "Action": "ec2:*",
>                             "Effect": "Allow",
>                             "Resource": "*"
>                         },
>                         {
>                             "Effect": "Allow",
>                             "Action": "elasticloadbalancing:*",
>                             "Resource": "*"
>                         },
>                         {
>                             "Effect": "Allow",
>                             "Action": "cloudwatch:*",
>                             "Resource": "*"
>                         },
>                         {
>                             "Effect": "Allow",
>                             "Action": "autoscaling:*",
>                             "Resource": "*"
>                         },
>                         {
>                             "Effect": "Allow",
>                             "Action": "iam:CreateServiceLinkedRole",
>                             "Resource": "*",
>                             "Condition": {
>                                 "StringEquals": {
>                                     "iam:AWSServiceName": [
>                                         "autoscaling.amazonaws.com 
> <http://autoscaling.amazonaws.com/>",
>                                         "ec2scheduled.amazonaws.com 
> <http://ec2scheduled.amazonaws.com/>",
>                                         "elasticloadbalancing.amazonaws.com 
> <http://elasticloadbalancing.amazonaws.com/>",
>                                         "spot.amazonaws.com 
> <http://spot.amazonaws.com/>",
>                                         "spotfleet.amazonaws.com 
> <http://spotfleet.amazonaws.com/>",
>                                         "transitgateway.amazonaws.com 
> <http://transitgateway.amazonaws.com/>"
>                                     ]
>                                 }
>                             }
>                         }
>                     ]
>                 },
>                 "VersionId": "v5",
>                 "IsDefaultVersion": true,
>                 "CreateDate": "2018-11-27 02:16:56+00:00"
>             },
>             "ResponseMetadata": {
>                 "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515",
>                 "HTTPStatusCode": 200,
>                 "HTTPHeaders": {
>                     "x-amzn-requestid": 
> "6d32c946-1273-4bc5-b465-e5549dc4f515",
>                     "content-type": "text/xml",
>                     "content-length": "2312",
>                     "vary": "accept-encoding",
>                     "date": "Thu, 25 Feb 2021 15:32:18 GMT"
>                 },
>                 "RetryAttempts": 0
>             }
>         },
>         "Policy": {
>             "Policy": {
>                 "PolicyName": "AmazonEC2FullAccess",
>                 "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6",
>                 "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess",
>                 "Path": "/",
>                 "DefaultVersionId": "v5",
>                 "AttachmentCount": 2,
>                 "PermissionsBoundaryUsageCount": 0,
>                 "IsAttachable": true,
>                 "Description": "Provides full access to Amazon EC2 via the 
> AWS Management Console.",
>                 "CreateDate": "2015-02-06 18:40:15+00:00",
>                 "UpdateDate": "2018-11-27 02:16:56+00:00"
>             },
>             "ResponseMetadata": {
>                 "RequestId": "a7e9f175-a757-4215-851e-f3d001083631",
>                 "HTTPStatusCode": 200,
>                 "HTTPHeaders": {
>                     "x-amzn-requestid": 
> "a7e9f175-a757-4215-851e-f3d001083631",
>                     "content-type": "text/xml",
>                     "content-length": "866",
>                     "date": "Thu, 25 Feb 2021 15:32:18 GMT"
>                 },
>                 "RetryAttempts": 0
>             }
>         }
>     }
> }
> 
> I have tried to map this json to Flink data types as follows:
> input_type = Types.ROW_NAMED(
>         ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'],
>         [
>             Types.LONG(),  # timestamp
>             Types.STRING(),  # harvesterID
>             Types.STRING(),  # clientID
>             Types.STRING(),  # deviceID
>             Types.ROW_NAMED(  # Payload
>                 ['Version', 'Policy'],
>                 [
>                     Types.ROW_NAMED(  # Version
>                         ['PolicyVersion', 'ResponseMetadata'],
>                         [
>                             Types.ROW_NAMED(  # PolicyVersion
>                                 ['Document', 'VersionId', 'IsDefaultVersion', 
> 'CreateDate'],
>                                 [
>                                     Types.ROW_NAMED(  # Document
>                                         ['Version', 'Statement'],
>                                         [
>                                             Types.STRING(),  # Version
>                                             Types.LIST(  # Statement
>                                                 Types.ROW_NAMED(
>                                                     ['Action', 'Effect', 
> 'Resource', 'Condition'],
>                                                     [
>                                                         Types.STRING(),  # 
> Action
>                                                         Types.STRING(),  # 
> Effect
>                                                         Types.STRING(),  # 
> Resource
>                                                         Types.ROW_NAMED(  # 
> Condition
>                                                             ['StringEquals'],
>                                                             [
>                                                                 
> Types.ROW_NAMED(  # StringEquals
>                                                                     
> ['iam:AWSServiceName'],
>                                                                     [
>                                                                         
> Types.LIST(Types.STRING())  # iam:AWSServiceName
>                                                                     ])
>                                                             ])
>                                                     ])
>                                             )
>                                         ]),
>                                     Types.STRING(),  # VersionId
>                                     Types.BOOLEAN(),  # IsDefaultVersion
>                                     Types.STRING()  # CreateDate
>                                 ]),
>                             Types.ROW_NAMED(
>                                 ['RequestId', 'HTTPStatusCode', 
> 'HTTPHeaders', 'RetryAttempts'],
>                                 [
>                                     Types.STRING(),  # RequestId
>                                     Types.INT(),  # HTTPStatusCode
>                                     Types.MAP(  # HTTPHeaders
>                                         Types.STRING(),
>                                         Types.STRING()
>                                     ),
>                                     Types.INT()  # RetryAttempts
>                                 ])
>                         ]),
>                     Types.ROW_NAMED(  # Policy
>                         ['Policy', 'ResponseMetadata'],
>                         [
>                             Types.ROW_NAMED(  # Policy
>                                 ['PolicyName', 'PolicyId', 'Arn', 'Path', 
> 'DefaultVersionId', 'AttachmentCount',
>                                     'PermissionBoundaryUsageCount', 
> 'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'],
>                                 [
>                                     Types.STRING(),  # PolicyName
>                                     Types.STRING(),  # PolicyId
>                                     Types.STRING(),  # Arn
>                                     Types.STRING(),  # Path
>                                     Types.STRING(),  # DefaultVersionId
>                                     Types.INT(),  # AttachmentCount
>                                     Types.INT(),  # 
> PermissionBoundaryUsageCount
>                                     Types.BOOLEAN(),  # IsAttachable
>                                     Types.STRING(),  # Description
>                                     Types.STRING(),  # CreateDate
>                                     Types.STRING()  # UpdateDate
>                                 ]),
>                             Types.ROW_NAMED(  # ResponseMetadata
>                                 ['RequestId', 'HTTPStatusCode', 
> 'HTTPHeaders', 'RetryAttempts'],
>                                 [
>                                     Types.STRING(),  # RequestId
>                                     Types.INT(),  # HTTPStatusCode
>                                     Types.MAP(  # HTTPHeaders
>                                         Types.STRING(),
>                                         Types.STRING()
>                                     ),
>                                     Types.INT()  # RetryAttempts
>                                 ])
>                         ])
>                 ])
>         ])
> 
> But when I try to run submit the application to Flink, I receive the 
> following error, when it tries to read data from the Kafka topic:
> java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to 
> org.apache.flink.types.Row
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
>     at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102)
>     at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
>     at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
> 
> I think the problem is caused by the Statement list in the event type format 
> (I have been counting the number of RowSerializer calls on the exception 
> stack and it seems that the ListSerializer is called for the Statement field 
> of the Document row). Is there a bug in Flink, or am I not using the LIST 
> type correctly? I really need to have a list in which the element type is 
> composite and not a primitive type. (I have also tried to use the BASIC_ARRAY 
> and PRIMITIVE_ARRAY types, but with those the job fails even before it is 
> submitted, which I expected would happen)
> 
> Best regards,
> Laszlo

Reply via email to