Hello, While trying to use the Pyflink DataStream API in Flink 1.13, I have encountered an error regarding list types. I am trying to read data from a Kafka topic that contains events in a json format. For example: { "timestamp": 1614259940, "harvesterID": "aws-harvester", "clientID": "aws-client-id", "deviceID": "aws-devid", "payload": { "Version": { "PolicyVersion": { "Document": { "Version": "2012-10-17", "Statement": [ { "Action": "ec2:*", "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": "elasticloadbalancing:*", "Resource": "*" }, { "Effect": "Allow", "Action": "cloudwatch:*", "Resource": "*" }, { "Effect": "Allow", "Action": "autoscaling:*", "Resource": "*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "*", "Condition": { "StringEquals": { "iam:AWSServiceName": [ "autoscaling.amazonaws.com", "ec2scheduled.amazonaws.com", "elasticloadbalancing.amazonaws.com" , "spot.amazonaws.com", "spotfleet.amazonaws.com", "transitgateway.amazonaws.com" ] } } } ] }, "VersionId": "v5", "IsDefaultVersion": true, "CreateDate": "2018-11-27 02:16:56+00:00" }, "ResponseMetadata": { "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "6d32c946-1273-4bc5-b465-e5549dc4f515", "content-type": "text/xml", "content-length": "2312", "vary": "accept-encoding", "date": "Thu, 25 Feb 2021 15:32:18 GMT" }, "RetryAttempts": 0 } }, "Policy": { "Policy": { "PolicyName": "AmazonEC2FullAccess", "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6", "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess", "Path": "/", "DefaultVersionId": "v5", "AttachmentCount": 2, "PermissionsBoundaryUsageCount": 0, "IsAttachable": true, "Description": "Provides full access to Amazon EC2 via the AWS Management Console.", "CreateDate": "2015-02-06 18:40:15+00:00", "UpdateDate": "2018-11-27 02:16:56+00:00" }, "ResponseMetadata": { "RequestId": "a7e9f175-a757-4215-851e-f3d001083631", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "a7e9f175-a757-4215-851e-f3d001083631", "content-type": "text/xml", "content-length": "866", "date": "Thu, 25 Feb 2021 15:32:18 GMT" }, "RetryAttempts": 0 } } } }
I have tried to map this json to Flink data types as follows: input_type = Types.ROW_NAMED( ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'], [ Types.LONG(), # timestamp Types.STRING(), # harvesterID Types.STRING(), # clientID Types.STRING(), # deviceID Types.ROW_NAMED( # Payload ['Version', 'Policy'], [ Types.ROW_NAMED( # Version ['PolicyVersion', 'ResponseMetadata'], [ Types.ROW_NAMED( # PolicyVersion ['Document', 'VersionId', 'IsDefaultVersion' , 'CreateDate'], [ Types.ROW_NAMED( # Document ['Version', 'Statement'], [ Types.STRING(), # Version Types.LIST( # Statement Types.ROW_NAMED( ['Action', 'Effect', 'Resource', 'Condition'], [ Types.STRING(), # Action Types.STRING(), # Effect Types.STRING(), # Resource Types.ROW_NAMED( # Condition ['StringEquals' ], [ Types. ROW_NAMED( # StringEquals [ 'iam:AWSServiceName'], [ Types.LIST(Types.STRING()) # iam:AWSServiceName ]) ]) ]) ) ]), Types.STRING(), # VersionId Types.BOOLEAN(), # IsDefaultVersion Types.STRING() # CreateDate ]), Types.ROW_NAMED( ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'], [ Types.STRING(), # RequestId Types.INT(), # HTTPStatusCode Types.MAP( # HTTPHeaders Types.STRING(), Types.STRING() ), Types.INT() # RetryAttempts ]) ]), Types.ROW_NAMED( # Policy ['Policy', 'ResponseMetadata'], [ Types.ROW_NAMED( # Policy ['PolicyName', 'PolicyId', 'Arn', 'Path', 'DefaultVersionId', 'AttachmentCount', 'PermissionBoundaryUsageCount', 'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'], [ Types.STRING(), # PolicyName Types.STRING(), # PolicyId Types.STRING(), # Arn Types.STRING(), # Path Types.STRING(), # DefaultVersionId Types.INT(), # AttachmentCount Types.INT(), # PermissionBoundaryUsageCount Types.BOOLEAN(), # IsAttachable Types.STRING(), # Description Types.STRING(), # CreateDate Types.STRING() # UpdateDate ]), Types.ROW_NAMED( # ResponseMetadata ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'], [ Types.STRING(), # RequestId Types.INT(), # HTTPStatusCode Types.MAP( # HTTPHeaders Types.STRING(), Types.STRING() ), Types.INT() # RetryAttempts ]) ]) ]) ]) But when I try to run submit the application to Flink, I receive the following error, when it tries to read data from the Kafka topic: java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to org.apache.flink.types.Row at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy( ListSerializer.java:102) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy( ListSerializer.java:42) at org.apache.flink.api.java.typeutils.runtime.RowSerializer .copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer .copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer .copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer .copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer .copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:72) I think the problem is caused by the Statement list in the event type format (I have been counting the number of RowSerializer calls on the exception stack and it seems that the ListSerializer is called for the Statement field of the Document row). Is there a bug in Flink, or am I not using the LIST type correctly? I really need to have a list in which the element type is composite and not a primitive type. (I have also tried to use the BASIC_ARRAY and PRIMITIVE_ARRAY types, but with those the job fails even before it is submitted, which I expected would happen) Best regards, Laszlo