Hi Dian,

Thank you very much, that worked very nicely.

Kind regards,

John
________________________________
From: Dian Fu <dian0511...@gmail.com>
Sent: 11 April 2022 06:32
To: John Tipper <john_tip...@hotmail.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: How to process events with different JSON schemas in single 
Kinesis stream using PyFlink?

Hi John,

1) Regarding to Table API, you could declare the column `detail` as STRING and 
then parse it into a json in the Python use-defined function as following:

```

@udf(result_type=DataTypes.STRING())
def get_id(detail):
    detail_json = json.loads(detail)
    if 'build-id' in detail_json:
        return detail_json['build-id']
    else:
        return detail_json['instance-id']

```

2) Regarding the DataStream API, Kinesis is still not supported, however it 
should be very easy as it is simply a wrapping of the Java Kinesis connector. 
If you want to use DataStream API, you could wrap it yourself for now and could 
refer to how the other connectors are handled [1] for more details.

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235

On Mon, Apr 11, 2022 at 6:17 AM John Tipper 
<john_tip...@hotmail.com<mailto:john_tip...@hotmail.com>> wrote:
TLDR; I want to know how best to process a stream of events using PyFlink, 
where the events in the stream have a number of different schemas.

Details:

I want to process a stream of events coming from a Kinesis data stream which 
originate from an AWS EventBridge bus. The events in this stream are all JSON, 
but have different schema. Specifically, they all have a common set of 
properties (e.g. version, id, time, source, detail-type), but there is a 
"detail" section that is a different shape, depending on what the event type is.

For instance, an event on the bus from EC2 might look something like this:


{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
    "instance-id": " i-1234567890abcdef0",
    "state": "terminated"
  }
}

whereas the "detail" section for an event from Codebuild might look like this 
(and the top level "source" field would be "aws.codebuild"):


   "detail":{
    "build-status": "SUCCEEDED",
    "project-name": "my-sample-project",
    "build-id": 
"arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
    "additional-information": {
      "artifact": {
        "md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
        "sha256sum": 
"6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
        "location": 
"arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
      }
     }
   }

I have the following constraints that I really want to abide by:


  1.  I cannot create a Kinesis data stream for each event type, i.e. the 
stream of events is multiplexed.
  2.  I'd like to create application code in Python using PyFlink (I can code 
happily in Java but my colleagues who need to contribute cannot).

I want to ingest the stream of events, key by the type of event (key by 
"detail-type", or "source") and then process the events according to their type.

I thought my options might be:


  1.  Use SQL/Table API (my preferred option), but it looks like JSON queries 
are not scheduled to be released until Flink 1.15, and until then I cannot 
define the schema of the input table where a field is a generic Map (i.e. 
Map<>). It appears I have to define the schema of the input table exactly, and 
I don't see how I can create a table which covers a property that varies in 
shape, e.g. for EC2 example above the schema for "detail-tye" is Map<VARCHAR, 
VARCHAR>, but for a CodeBuild event it's a more deeply nested JSON structure. 
If there were a "JSON" type then this would appear to be the way to go.
  2.
  3.  Use the Datastream API, but it looks like there is not a PyFlink Kinesis 
connector for the DataStream API. There is a Java connector - what's involved 
in creating a custom PyFlink Datastream connector?

Are there any other options I've missed? What's the best way to approach this?

Many thanks,

John

Reply via email to