TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas.
Details: I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is. For instance, an event on the bus from EC2 might look something like this: { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "111122223333", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "detail": { "instance-id": " i-1234567890abcdef0", "state": "terminated" } } whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"): "detail":{ "build-status": "SUCCEEDED", "project-name": "my-sample-project", "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX", "additional-information": { "artifact": { "md5sum": "da9c44c8a9a3cd4b443126e823168fEX", "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX", "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip" } } } I have the following constraints that I really want to abide by: 1. I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed. 2. I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot). I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type. I thought my options might be: 1. Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go. 2. 3. Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector? Are there any other options I've missed? What's the best way to approach this? Many thanks, John