Hi John, 1) Regarding to Table API, you could declare the column `detail` as STRING and then parse it into a json in the Python use-defined function as following:
``` @udf(result_type=DataTypes.STRING()) def get_id(detail): detail_json = json.loads(detail) if 'build-id' in detail_json: return detail_json['build-id'] else: return detail_json['instance-id'] ``` 2) Regarding the DataStream API, Kinesis is still not supported, however it should be very easy as it is simply a wrapping of the Java Kinesis connector. If you want to use DataStream API, you could wrap it yourself for now and could refer to how the other connectors are handled [1] for more details. Regards, Dian [1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235 On Mon, Apr 11, 2022 at 6:17 AM John Tipper <john_tip...@hotmail.com> wrote: > TLDR; I want to know how best to process a stream of events using PyFlink, > where the events in the stream have a number of different schemas. > > Details: > > I want to process a stream of events coming from a Kinesis data stream > which originate from an AWS EventBridge bus. The events in this stream are > all JSON, but have different schema. Specifically, they all have a common > set of properties (e.g. version, id, time, source, detail-type), but there > is a "detail" section that is a different shape, depending on what the > event type is. > > For instance, an event on the bus from EC2 might look something like this: > > { > "version": "0", > "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", > "detail-type": "EC2 Instance State-change Notification", > "source": "aws.ec2", > "account": "111122223333", > "time": "2017-12-22T18:43:48Z", > "region": "us-west-1", > "detail": { > "instance-id": " i-1234567890abcdef0", > "state": "terminated" > } > } > > > whereas the "detail" section for an event from Codebuild might look like > this (and the top level "source" field would be "aws.codebuild"): > > "detail":{ > "build-status": "SUCCEEDED", > "project-name": "my-sample-project", > "build-id": > "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX", > "additional-information": { > "artifact": { > "md5sum": "da9c44c8a9a3cd4b443126e823168fEX", > "sha256sum": > "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX", > "location": > "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip" > } > } > } > > > I have the following constraints that I really want to abide by: > > > 1. I cannot create a Kinesis data stream for each event type, i.e. the > stream of events is multiplexed. > 2. I'd like to create application code in Python using PyFlink (I can > code happily in Java but my colleagues who need to contribute cannot). > > > I want to ingest the stream of events, key by the type of event (key by > "detail-type", or "source") and then process the events according to their > type. > > I thought my options might be: > > > 1. Use SQL/Table API (my preferred option), but it looks like JSON > queries are not scheduled to be released until Flink 1.15, and until then I > cannot define the schema of the input table where a field is a generic Map > (i.e. Map<>). It appears I have to define the schema of the input table > exactly, and I don't see how I can create a table which covers a property > that varies in shape, e.g. for EC2 example above the schema for > "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a > more deeply nested JSON structure. If there were a "JSON" type then this > would appear to be the way to go. > 2. > 3. Use the Datastream API, but it looks like there is not a PyFlink > Kinesis connector for the DataStream API. There is a Java connector - > what's involved in creating a custom PyFlink Datastream connector? > > > Are there any other options I've missed? What's the best way to approach > this? > > Many thanks, > > John > >