Hi John,

1) Regarding to Table API, you could declare the column `detail` as STRING
and then parse it into a json in the Python use-defined function as
following:

```

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']

```

2) Regarding the DataStream API, Kinesis is still not supported, however it
should be very easy as it is simply a wrapping of the Java Kinesis
connector. If you want to use DataStream API, you could wrap it yourself
for now and could refer to how the other connectors are handled [1] for
more details.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235

On Mon, Apr 11, 2022 at 6:17 AM John Tipper <john_tip...@hotmail.com> wrote:

> TLDR; I want to know how best to process a stream of events using PyFlink,
> where the events in the stream have a number of different schemas.
>
> Details:
>
> I want to process a stream of events coming from a Kinesis data stream
> which originate from an AWS EventBridge bus. The events in this stream are
> all JSON, but have different schema. Specifically, they all have a common
> set of properties (e.g. version, id, time, source, detail-type), but there
> is a "detail" section that is a different shape, depending on what the
> event type is.
>
> For instance, an event on the bus from EC2 might look something like this:
>
> {
>   "version": "0",
>   "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
>   "detail-type": "EC2 Instance State-change Notification",
>   "source": "aws.ec2",
>   "account": "111122223333",
>   "time": "2017-12-22T18:43:48Z",
>   "region": "us-west-1",
>   "detail": {
>     "instance-id": " i-1234567890abcdef0",
>     "state": "terminated"
>   }
> }
>
>
> whereas the "detail" section for an event from Codebuild might look like
> this (and the top level "source" field would be "aws.codebuild"):
>
>    "detail":{
>     "build-status": "SUCCEEDED",
>     "project-name": "my-sample-project",
>     "build-id": 
> "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
>     "additional-information": {
>       "artifact": {
>         "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
>         "sha256sum": 
> "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
>         "location": 
> "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
>       }
>      }
>    }
>
>
> I have the following constraints that I really want to abide by:
>
>
>    1. I cannot create a Kinesis data stream for each event type, i.e. the
>    stream of events is multiplexed.
>    2. I'd like to create application code in Python using PyFlink (I can
>    code happily in Java but my colleagues who need to contribute cannot).
>
>
> I want to ingest the stream of events, key by the type of event (key by
> "detail-type", or "source") and then process the events according to their
> type.
>
> I thought my options might be:
>
>
>    1. Use SQL/Table API (my preferred option), but it looks like JSON
>    queries are not scheduled to be released until Flink 1.15, and until then I
>    cannot define the schema of the input table where a field is a generic Map
>    (i.e. Map<>). It appears I have to define the schema of the input table
>    exactly, and I don't see how I can create a table which covers a property
>    that varies in shape, e.g. for EC2 example above the schema for
>    "detail-tye" is Map<VARCHAR, VARCHAR>, but for a CodeBuild event it's a
>    more deeply nested JSON structure. If there were a "JSON" type then this
>    would appear to be the way to go.
>    2.
>    3. Use the Datastream API, but it looks like there is not a PyFlink
>    Kinesis connector for the DataStream API. There is a Java connector -
>    what's involved in creating a custom PyFlink Datastream connector?
>
>
> Are there any other options I've missed? What's the best way to approach
> this?
>
> Many thanks,
>
> John
>
>

Reply via email to