Hi Laszlo, It seems because the json format supports object array type and doesn’t support list type.
However, it still hasn’t provided object array type in PyFlink Datastream API [1]. I have created a ticket as a following up. For now, I guess you could implement it yourself and could take a look at the basic array [2] as an example. Regards, Dian [1] https://issues.apache.org/jira/browse/FLINK-23033 [2] https://github.com/apache/flink/blob/master/flink-python/pyflink/common/typeinfo.py#L243 > 2021年6月17日 下午5:01,László Ciople <ciople.las...@gmail.com> 写道: > > Hello, > While trying to use the Pyflink DataStream API in Flink 1.13, I have > encountered an error regarding list types. I am trying to read data from a > Kafka topic that contains events in a json format. For example: > { > "timestamp": 1614259940, > "harvesterID": "aws-harvester", > "clientID": "aws-client-id", > "deviceID": "aws-devid", > "payload": { > "Version": { > "PolicyVersion": { > "Document": { > "Version": "2012-10-17", > "Statement": [ > { > "Action": "ec2:*", > "Effect": "Allow", > "Resource": "*" > }, > { > "Effect": "Allow", > "Action": "elasticloadbalancing:*", > "Resource": "*" > }, > { > "Effect": "Allow", > "Action": "cloudwatch:*", > "Resource": "*" > }, > { > "Effect": "Allow", > "Action": "autoscaling:*", > "Resource": "*" > }, > { > "Effect": "Allow", > "Action": "iam:CreateServiceLinkedRole", > "Resource": "*", > "Condition": { > "StringEquals": { > "iam:AWSServiceName": [ > "autoscaling.amazonaws.com > <http://autoscaling.amazonaws.com/>", > "ec2scheduled.amazonaws.com > <http://ec2scheduled.amazonaws.com/>", > "elasticloadbalancing.amazonaws.com > <http://elasticloadbalancing.amazonaws.com/>", > "spot.amazonaws.com > <http://spot.amazonaws.com/>", > "spotfleet.amazonaws.com > <http://spotfleet.amazonaws.com/>", > "transitgateway.amazonaws.com > <http://transitgateway.amazonaws.com/>" > ] > } > } > } > ] > }, > "VersionId": "v5", > "IsDefaultVersion": true, > "CreateDate": "2018-11-27 02:16:56+00:00" > }, > "ResponseMetadata": { > "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515", > "HTTPStatusCode": 200, > "HTTPHeaders": { > "x-amzn-requestid": > "6d32c946-1273-4bc5-b465-e5549dc4f515", > "content-type": "text/xml", > "content-length": "2312", > "vary": "accept-encoding", > "date": "Thu, 25 Feb 2021 15:32:18 GMT" > }, > "RetryAttempts": 0 > } > }, > "Policy": { > "Policy": { > "PolicyName": "AmazonEC2FullAccess", > "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6", > "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess", > "Path": "/", > "DefaultVersionId": "v5", > "AttachmentCount": 2, > "PermissionsBoundaryUsageCount": 0, > "IsAttachable": true, > "Description": "Provides full access to Amazon EC2 via the > AWS Management Console.", > "CreateDate": "2015-02-06 18:40:15+00:00", > "UpdateDate": "2018-11-27 02:16:56+00:00" > }, > "ResponseMetadata": { > "RequestId": "a7e9f175-a757-4215-851e-f3d001083631", > "HTTPStatusCode": 200, > "HTTPHeaders": { > "x-amzn-requestid": > "a7e9f175-a757-4215-851e-f3d001083631", > "content-type": "text/xml", > "content-length": "866", > "date": "Thu, 25 Feb 2021 15:32:18 GMT" > }, > "RetryAttempts": 0 > } > } > } > } > > I have tried to map this json to Flink data types as follows: > input_type = Types.ROW_NAMED( > ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'], > [ > Types.LONG(), # timestamp > Types.STRING(), # harvesterID > Types.STRING(), # clientID > Types.STRING(), # deviceID > Types.ROW_NAMED( # Payload > ['Version', 'Policy'], > [ > Types.ROW_NAMED( # Version > ['PolicyVersion', 'ResponseMetadata'], > [ > Types.ROW_NAMED( # PolicyVersion > ['Document', 'VersionId', 'IsDefaultVersion', > 'CreateDate'], > [ > Types.ROW_NAMED( # Document > ['Version', 'Statement'], > [ > Types.STRING(), # Version > Types.LIST( # Statement > Types.ROW_NAMED( > ['Action', 'Effect', > 'Resource', 'Condition'], > [ > Types.STRING(), # > Action > Types.STRING(), # > Effect > Types.STRING(), # > Resource > Types.ROW_NAMED( # > Condition > ['StringEquals'], > [ > > Types.ROW_NAMED( # StringEquals > > ['iam:AWSServiceName'], > [ > > Types.LIST(Types.STRING()) # iam:AWSServiceName > ]) > ]) > ]) > ) > ]), > Types.STRING(), # VersionId > Types.BOOLEAN(), # IsDefaultVersion > Types.STRING() # CreateDate > ]), > Types.ROW_NAMED( > ['RequestId', 'HTTPStatusCode', > 'HTTPHeaders', 'RetryAttempts'], > [ > Types.STRING(), # RequestId > Types.INT(), # HTTPStatusCode > Types.MAP( # HTTPHeaders > Types.STRING(), > Types.STRING() > ), > Types.INT() # RetryAttempts > ]) > ]), > Types.ROW_NAMED( # Policy > ['Policy', 'ResponseMetadata'], > [ > Types.ROW_NAMED( # Policy > ['PolicyName', 'PolicyId', 'Arn', 'Path', > 'DefaultVersionId', 'AttachmentCount', > 'PermissionBoundaryUsageCount', > 'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'], > [ > Types.STRING(), # PolicyName > Types.STRING(), # PolicyId > Types.STRING(), # Arn > Types.STRING(), # Path > Types.STRING(), # DefaultVersionId > Types.INT(), # AttachmentCount > Types.INT(), # > PermissionBoundaryUsageCount > Types.BOOLEAN(), # IsAttachable > Types.STRING(), # Description > Types.STRING(), # CreateDate > Types.STRING() # UpdateDate > ]), > Types.ROW_NAMED( # ResponseMetadata > ['RequestId', 'HTTPStatusCode', > 'HTTPHeaders', 'RetryAttempts'], > [ > Types.STRING(), # RequestId > Types.INT(), # HTTPStatusCode > Types.MAP( # HTTPHeaders > Types.STRING(), > Types.STRING() > ), > Types.INT() # RetryAttempts > ]) > ]) > ]) > ]) > > But when I try to run submit the application to Flink, I receive the > following error, when it tries to read data from the Kafka topic: > java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to > org.apache.flink.types.Row > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) > > I think the problem is caused by the Statement list in the event type format > (I have been counting the number of RowSerializer calls on the exception > stack and it seems that the ListSerializer is called for the Statement field > of the Document row). Is there a bug in Flink, or am I not using the LIST > type correctly? I really need to have a list in which the element type is > composite and not a primitive type. (I have also tried to use the BASIC_ARRAY > and PRIMITIVE_ARRAY types, but with those the job fails even before it is > submitted, which I expected would happen) > > Best regards, > Laszlo