Hi Dian, Thank you very much, that worked very nicely.
Kind regards, John ________________________________ From: Dian Fu <dian0511...@gmail.com> Sent: 11 April 2022 06:32 To: John Tipper <john_tip...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink? Hi John, 1) Regarding to Table API, you could declare the column `detail` as STRING and then parse it into a json in the Python use-defined function as following: ``` @udf(result_type=DataTypes.STRING()) def get_id(detail): detail_json = json.loads(detail) if 'build-id' in detail_json: return detail_json['build-id'] else: return detail_json['instance-id'] ``` 2) Regarding the DataStream API, Kinesis is still not supported, however it should be very easy as it is simply a wrapping of the Java Kinesis connector. If you want to use DataStream API, you could wrap it yourself for now and could refer to how the other connectors are handled [1] for more details. Regards, Dian [1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235 On Mon, Apr 11, 2022 at 6:17 AM John Tipper <john_tip...@hotmail.com<mailto:john_tip...@hotmail.com>> wrote: TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas. Details: I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is. For instance, an event on the bus from EC2 might look something like this: { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "111122223333", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "detail": { "instance-id": " i-1234567890abcdef0", "state": "terminated" } } whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"): "detail":{ "build-status": "SUCCEEDED", "project-name": "my-sample-project", "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX", "additional-information": { "artifact": { "md5sum": "da9c44c8a9a3cd4b443126e823168fEX", "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX", "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip" } } } I have the following constraints that I really want to abide by: 1. I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed. 2. I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot). I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type. I thought my options might be: 1. Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go. 2. 3. Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector? Are there any other options I've missed? What's the best way to approach this? Many thanks, John