Hi devs,

Thanks Hong for the help with moving the Draft into a FLIP.

I have opened a Vote thread for the FLIP here 
https://lists.apache.org/thread/48rdzjbh8wvo8dr9q15vlsh1f5cojx4q.

I will keep this discussion thread open and please feel free to continue 
leaving any feedback and questions here. Thank you very much

Regards,
Daren


> On 14 Apr 2025, at 21:22, Wong, Daren <daren...@amazon.co.uk.INVALID> wrote:
> 
> Hi Keith,
> 
> Thanks for the feedback and questions
> 
>> 1. Can you elaborate what are the possible configuration key and their 
>> examples for CloudWatchSinkProperties?
> 
> In addition to standard AsyncSink configuration key, i.e 
> “sink.batch.max-size“, the other possible configuration keys for TableAPI are:
> 
> - metric.namespace // Required
> - metric.name.key // Optional
> - metric.dimension.keys // Optional
> - metric.value.key // Optional
> - metric.count.key // Optional
> - metric.unit.key // Optional
> - metric.storage-resolution.key // Optional
> - metric.timestamp.key // Optional
> - metric.statistic.max.key // Optional
> - metric.statistic.min.key // Optional
> - metric.statistic.sum.key // Optional
> - metric.statistic.sample-count.key // Optional
> - sink.invalid-metric.retry-mode // Optional
> 
> At a high level, there are 3 types of key for TableAPI:
> 
> * metric.namespace - Required in every CW PutMetricDataRequest
> * metric.X.key - Column key identifier to map the Table column to the 
> respective fields in the CW PutMetricDataRequest. For example, 
> “metric.timestamp.key = my_timestamp” means the TableSink will look for 
> column name/field “my_timestamp” to extract it’s value to be used as 
> timestamp in CW PutMetricDataRequest.
> * sink.invalid-metric.retry-mode - Error handling behavior when an Invalid 
> record is present, i.e invalid timestamp.
> 
> Note that in DataStream API, users do not need to specify “metric.X.key” as 
> the elementConverter is exposed to them to achieve the same purpose. 
> 
> 
>> 2. I see that MetricWriteRequest's unit field is of String type. Is there a 
>> motivation of using String type as opposed to StandardUnit enum type? This 
>> cuts down on user error by left shifting correctness check to IDE/compiler.
> 
> The motivation of using String is to not expose any AWS SDK specific models 
> directly to user when using the connector. However, I plan to add validation 
> in the sink to fail and provide a faster feedback loop to user if an 
> unknown/invalid unit is set.
> 
> 
>> 3. Does the three options provided for error handling behaviour specifically 
>> just for old metrics failure case or all 400s or for 500s as well? Will 
>> users benefit from a broader/more flexible error handling configuration? For 
>> example, desirable behaviour might be to i) fail job on permission issue ii) 
>> dropping old records that would be rejected by CW iii) retry on throttles or 
>> 500s or timeouts.
> 
> The current proposed option is aligned with i) and iii) where a fatal 
> exception (i.e invalid permission) will fail the job, and non-fatal exception 
> (i.e throttles) will be retried by default. The more fine grained control 
> offered to user here is specifically around InvalidRecordException where user 
> can select any of the 3 options for retry mode via 
> "sink.invalid-metric.retry-mode".
> 
> 
>> 4. Should we consider bisecting or retrying remaining of the batch if CW 
>> PutMetricDataResponse provides sufficient information on which MetricDatum 
>> is rejected.
> 
> Yes, I have explored this and PutMetricDataResponse does not provide 
> sufficient information on which MetricDatum is rejected. For bisecting, Flink 
> AsyncSink does not currently support this and we will need a custom 
> implementation for this which increases complexity. In addition, I would 
> think it we want to do bisecting, it should be done on Flink AsyncSink 
> interface as other connectors can benefit from this as well.
> 
> 
> 
>> 5. On the batching of requests, how do you propose batch size (specifically 
>> size) is enforced? Specifically, I am interested in how we are calculating 
>> the data sizes of a batch.
> 
> 
> * maxBatchSize - This is the number of Record (“MetricWriteRequest” or 
> "MetricDatum") in a single batch to be sent in a single PutMetricDataRequest. 
> CW sets a hard limit for this to be 1000 as mentioned by Hong.
> * maxRecordSizeInBytes - This is the size of a single Record 
> (“MetricWriteRequest” or "MetricDatum") in bytes. CW does not expose method 
> to get the size in byte. Therefore, the sink will estimate size based on the 
> attributes in the MetricDatum, where double is treated as 8 bytes, and string 
> size estimated based on utf-8/ASCII encoding (as per CW doc). I.e a 
> MetricDatum with metric name (string length 10), 5 values and counts (double) 
> will have 10 + 5*8*2 = 90 bytes. Therefore, most of the size is contributed 
> by the string fields like dimensions followed by the values and count array.
> * maxBatchSizeInBytes - This is the size of all records in a single batch to 
> be flushed. CW PutMetricDataRequest has a hard limit of 1MB which will be the 
> hard limit of this parameter as well.
> 
> 
> Therefore, a sample set of limits for these config could be: maxBatchSize = 
> 1000, maxRecordSizeInBytes = 1000, maxBatchSizeInBytes = 1000*1000
> 
> Regards,
> Daren
> 
> 
> On 14/04/2025, 09:17, "Keith Lee" <leekeiabstract...@gmail.com 
> <mailto:leekeiabstract...@gmail.com>> wrote:
> 
> 
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
> 
> 
> 
> 
> 
> Hello Daren,
> 
> 
> Thank you for the FLIP. Questions below:
> 
> 
> 1. Can you elaborate what are the possible configuration key and their
> examples for CloudWatchSinkProperties?
> 
> 
> 2. I see that MetricWriteRequest's unit field is of String type. Is there a
> motivation of using String type as opposed to StandardUnit enum type? This
> cuts down on user error by left shifting correctness check to IDE/compiler.
> -
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html
>  
> <https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html>
> 
> 
>> In addition, CloudWatch rejects any metric that’s more than 2 weeks old,
> we will add a configurable option for users to determine the error handling
> behavior of either: 1) drop the records OR 2) trigger a job failure OR 3)
> keep retrying the batch.
> 
> 
> 3. Does the three options provided for error handling behaviour
> specifically just for old metrics failure case or all 400s or for 500s as
> well? Will users benefit from a broader/more flexible error handling
> configuration? For example, desirable behaviour might be to i) fail job on
> permission issue ii) dropping old records that would be rejected by CW iii)
> retry on throttles or 500s or timeouts.
> 
> 
>> If the batch contains one MetricDatum poison pill, the request will fail
> and be handled as a fully failed request.
> 
> 
> 4. CShould we consider bisecting or retrying remaining of the batch if CW
> PutMetricDataResponse provides sufficient information on which MetricDatum
> is rejected.
> 
> 
>> A list of MetricWriteRequest will be batched based on maxBatchSize which
> is then submitted as a PutMetricDataRequest.
> 
> 
> 5. On the batching of requests, how do you propose batch size (specifically
> size) is enforced? Specifically, I am interested in how we are calculating
> the data sizes of a batch.
> 
> 
> Best regards
> Keith Lee
> 
> 
> 
> 
> On Fri, Apr 11, 2025 at 6:43 PM Wong, Daren <daren...@amazon.co.uk.inva 
> <mailto:daren...@amazon.co.uk.inva>lid>
> wrote:
> 
> 
>> Hi Hong,
>> 
>> Thanks for the comments and suggestions, really appreciate it!
>> 
>> 
>>> Clarify Cloudwatch API limitations and handling in the sink. [1] Great
>> to see we're being explicit with the Java API model! Let's be explicit
>> about the PutMetricsData API constraints and how we handle them (it would
>> be good to have a separate section):
>>> 1. Request size and count. Maximum size per request is 1MB, with
>> limit of 1000 metrics. We need to enforce this during batching to prevent
>> users from shooting themselves in the foot!
>>> 2. For Values/Counts, limit is 150 unique metrics within a single
>> request.
>>> 3. Data type limitations. CloudWatch API uses Java double, but it
>> doesn't support Double.NaN. We need to be explicit to handle improperly
>> formatted data. We can consider failing fast/slow as you have suggested.
>> Consider using "StrictEntityValidation" in the failure handling. [1] (For
>> the design, we can simply mention, but work out the details when we
>> implement)
>>> 4. Timestamp limitations. Cloudwatch also has limitations around
>> accepted timestamps (as you have noted). Metric data can be 48h in the past
>> or 2h in the future. Let's clarify how we handle invalid values.
>>> 5. Data ordering. CW API doesn't seem to specify limitations
>> around out-of-order / repeat data. That's great, and it would be good to be
>> explicit about and validate this behavior.
>> 
>> 
>> This is very detailed, thank you, I have updated the FLIP outlining these
>> limitations. In summary, here’s how they translate to limitation in the
>> AsyncSink configuration:
>> 
>> 
>> * Maximum size per CW PutMetricDataRequest is 1MB → maxBatchSizeInBytes
>> cannot be more than 1 MB
>> * Maximum number of MetricDatum per CW PutMetricDataRequest is 1000 →
>> maxBatchSize cannot be more than 1000
>> * Maximum 150 unique values in MetricDatum.Values → maxRecordSizeInBytes
>> cannot be more than 150 Bytes (assuming each 1 value size is 1 byte)
>> * CloudWatch API uses Java double, but it doesn't support Double.NaN → Use
>> StrictEntityValidation
>> * MetricDatum Timestamp limitations (up to 2 weeks in the past and up to 2
>> hours into the future) → Validation against this with user choice of error
>> handling behavior for this case
>> * Data ordering. Yes I have validated CW accepts out-of-order data, I have
>> updated the FLIP to point this out.
>> 
>> 
>> 
>> 
>>> The PutMetricData API supports two data modes, EntityMetricData and
>> MetricData [1]. Since we are only supporting MetricData for now, let's make
>> sure our interface allows the extension to support EntityMetricData in the
>> future. For example, we should make sure we use our own data model classes
>> instead of using AWS SDK classes. Also, we currently propose to use
>> wrappers instead of primitives. Let's use the primitive where we can
>> 
>> 
>> Yes, agree on making the interface allows extension to support
>> EntityMetricData in the future.
>> 
>> We are using our own data model “MetricWriteRequest” and have updated the
>> FLIP to use primitives.
>> 
>> 
>> 
>>> - PutMetricData supports StrictEntityValidation [1]. As mentioned
>> above, let's use this.
>>> - I like that we enforce a single namespace per sink, since that is
>> aligned with the PutMetricData API interface. Let's be explicit on the
>> reasoning in the FLIP!
>>> - Clarify sink data semantics. Since we're using the async sink, we
>> only provide at-least-once semantics. Let’s make this guarantee explicit.
>> 
>> 
>> Agree and updated FLIP
>> 
>> 
>> 
>>> 4. CW sink interface. Currently we are proposing to have a static input
>> data type instead of generic input type. This would require user to use a
>> map separately (As you have noted). For future extensibility, I would
>> prefer if we exposed an ElementConverter directly to the user. That way, we
>> can provide a custom class "MetricWriteRequest" in the output interface of
>> the ElementConverter that can be extended to support additional features
>> (e.g. EntityMetricData) in the future.
>> 
>> 
>> Thanks, I agree with both suggestions on exposing ElementConverter to
>> user, and provide a custom class “MetricWriteRequest” in the output for
>> extensibility. Updated the FLIP as well.
>> 
>> 
>> 
>>> 5. Table API design.
>>> - I'm not a fan of the way we currently use dimensions in the
>> properties.
>>> - It would be better to use a Flink-native SQL support like PRIMARY
>> KEY instead [2]. This also enforces that the specified dimension cannot be
>> null.
>> 
>> 
>> Thanks for the suggestion, but I also see limitation in this approach for
>> when user wants to define more than 1 dimension columns with PRIMARY KEY,
>> and CloudWatch also allows dimensions to be optional as well. Hence, I see
>> the current approach as being more flexible for user to configure, let me
>> know what your thoughts are here.
>> 
>> 
>> 
>>> 6. Housekeeping
>>> - It would be good to tidy up the public interfaces linked. For
>> example, we don't make any explicit usage of the public interfaces in
>> FLIP-191, so we can remove that.
>> 
>> 
>> Thanks for raising this, agreed and have updated the FLIP.
>> 
>> 
>> Regards,
>> Daren
>> 
>> On 08/04/2025, 12:02, "Hong Liang" <h...@apache.org <mailto:h...@apache.org> 
>> <mailto:
>> h...@apache.org <mailto:h...@apache.org>>> wrote:
>> 
>> 
>> CAUTION: This email originated from outside of the organization. Do not
>> click links or open attachments unless you can confirm the sender and know
>> the content is safe.
>> 
>> 
>> 
>> 
>> 
>> 
>> Hi Daren,
>> 
>> 
>> Thanks for the contribution — exciting to see support for new sinks! I’ve
>> added a few comments and suggestions below:
>> 
>> 
>> 1. Clarify Cloudwatch API limitations and handling in the sink. [1]
>> Great to see we're being explicit with the Java API model! Let's be
>> explicit about the PutMetricsData API constraints and how we handle them
>> (it would be good to have a separate section):
>> 1. Request size and count. Maximum size per request is 1MB, with
>> limit of 1000 metrics. We need to enforce this during batching to prevent
>> users from shooting themselves in the foot!
>> 2. For Values/Counts, limit is 150 unique metrics within a single
>> request.
>> 3. Data type limitations. CloudWatch API uses Java double, but it
>> doesn't support Double.NaN. We need to be explicit to handle improperly
>> formatted data. We can consider failing fast/slow as you have suggested.
>> Consider using "StrictEntityValidation" in the failure handling. [1] (For
>> the design, we can simply mention, but work out the details when we
>> implement)
>> 4. Timestamp limitations. Cloudwatch also has limitations around
>> accepted timestamps (as you have noted). Metric data can be 48h in the past
>> or 2h in the future. Let's clarify how we handle invalid values.
>> 5. Data ordering. CW API doesn't seem to specify limitations around
>> out-of-order / repeat data. That's great, and it would be good to be
>> explicit about and validate this behavior.
>> 2. Clarify supported features [1]
>> - The PutMetricData API supports two data modes, EntityMetricData and
>> MetricData [1]. Since we are only supporting MetricData for now, let's make
>> sure our interface allows the extension to support EntityMetricData in the
>> future. For example, we should make sure we use our own data model classes
>> instead of using AWS SDK classes. Also, we currently propose to use
>> wrappers instead of primitives. Let's use the primitive where we can :).
>> - PutMetricData supports StrictEntityValidation [1]. As mentioned
>> above, let's use this.
>> - I like that we enforce a single namespace per sink, since that is
>> aligned with the PutMetricData API interface. Let's be explicit on the
>> reasoning in the FLIP!
>> 3. Clarify sink data semantics. Since we're using the async sink, we only
>> provide at-least-once semantics. Let’s make this guarantee explicit.
>> 4. CW sink interface. Currently we are proposing to have a static input
>> data type instead of generic input type. This would require user to use a
>> map separately (As you have noted). For future extensibility, I would
>> prefer if we exposed an ElementConverter directly to the user. That way, we
>> can provide a custom class "MetricWriteRequest" in the output interface of
>> the ElementConverter that can be extended to support additional features
>> (e.g. EntityMetricData) in the future.
>> 5. Table API design.
>> - I'm not a fan of the way we currently use dimensions in the
>> properties.
>> - It would be better to use a Flink-native SQL support like PRIMARY KEY
>> instead [2]. This also enforces that the specified dimension cannot be
>> null.
>> 6. Housekeeping
>> - It would be good to tidy up the public interfaces linked. For example,
>> we don't make any explicit usage of the public interfaces in FLIP-191, so
>> we can remove that.
>> 
>> 
>> 
>> 
>> Overall, nice FLIP! Thanks for the detail and making it an easy read. Hope
>> the above helps!
>> 
>> 
>> 
>> 
>> Cheers,
>> Hong
>> 
>> 
>> 
>> 
>> [1]
>> 
>> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
>>  
>> <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html>
>> <
>> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
>>  
>> <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html>
>>> 
>> 
>> 
>> [2]
>> 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
>>  
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key>
>> <
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
>>  
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key>
>>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Mon, Apr 7, 2025 at 9:24 PM Ahmed Hamdy <hamdy10...@gmail.com 
>> <mailto:hamdy10...@gmail.com> <mailto:
>> hamdy10...@gmail.com <mailto:hamdy10...@gmail.com>>> wrote:
>> 
>> 
>>> Hi Daren thanks for the FLIP
>>> 
>>> Just a couple of questions and comments?
>>> 
>>>> Usable in both DataStream and Table API/SQL
>>> What about python API? this is sth we should consider ahead since the
>>> abstract element converter doesn't have a Flink type mapping to be used
>>> from python, this is a issue we faced with DDB before
>>> 
>>>> Therefore, the connector will provide a CloudWatchMetricInput model
>> that
>>> user can use to pass as input to the connector. For example, in
>> DataStream
>>> API, it could be a MapFunction called just before passing to the sink as
>>> follows:
>>> I am not quite sure I follow, are you suggesting we introduce a
>>> specific new converter class or relay that to users? also since you
>>> mentioned FLIP-171, are you suggesting to implement this sink as an
>>> extension to Async Sink, in that case It is more confusing to me how we
>> are
>>> going to use the map function with the AsyncSink.ElementConvertor.
>>> 
>>>> public class SampleToCloudWatchMetricInputMapper implements MapFunction<
>>> Sample, CloudWatchMetricInput>
>>> 
>>> Is CloudWatchMetricInput a newly introduced model class, I couldn't find
>> it
>>> in the sdkv2, If we are introducing it then it might be useful to add to
>>> the FLIP since this is part of the API.
>>> 
>>> 
>>>> Supports both Bounded (Batch) and Unbounded (Streaming)
>>> 
>>> What do you propose to handle them differently? I can't find a specific
>>> thing in the FLIP
>>> 
>>> Regarding table API
>>> 
>>>> 'metric.dimension.keys' = 'cw_dim',
>>> 
>>> I am not in favor of doing this as this will complicate the schema
>>> validation on table creation, maybe we can use the whole schema as
>>> dimensions excluding the values and the count, let me know your thoughts
>>> here.
>>> 
>>>> 'metric.name.key' = 'cw_metric_name',
>>> 
>>> So we are making the metric part of the row data? have we considered not
>>> doing that instead and having 1 table map to 1 metric instead of
>> namespace?
>>> It might be more suitable to enforce some validations on the dimensions
>>> schema this way. Ofc this will probably have is introduce some
>> intermediate
>>> class in the model to hold the dimensions, values and counts without the
>>> metric name and namespace that we will extract from the sink definition,
>>> let me know your thoughts here?
>>> 
>>> 
>>>> `cw_value` BIGINT,
>>> Are we going to allow all numeric types for values?
>>> 
>>>> protected void submitRequestEntries(
>>> List<MetricDatum> requestEntries,
>>> Consumer<List<MetricDatum>> requestResult)
>>> 
>>> nit: This method should be deprecated after 1.20. I hope the repo is
>>> upgraded by the time we implement this
>>> 
>>>> Error Handling
>>> Away from poison pills, what error handling are you suggesting? Are we
>>> following the footsteps of the other AWS connectors with error
>>> classification, is there any effort to abstract it on the AWS side?
>>> 
>>> And on the topic of poison pills, If I understand correctly that is a
>> topic
>>> that has been discussed for a while, this ofc breaks the at-least-once
>>> semantic and might be confusing to the users, additionally since cloud
>>> watch API fails the full batch how are you suggesting we identify the
>>> poison pills? I am personally in favor of global failures in this case
>> but
>>> would love to hear the feedback here.
>>> 
>>> 
>>> 
>>> Best Regards
>>> Ahmed Hamdy
>>> 
>>> 
>>> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva 
>>> <mailto:daren...@amazon.co.uk.inva>
>> <mailto:daren...@amazon.co.uk.inva <mailto:daren...@amazon.co.uk.inva>>lid>
>>> wrote:
>>> 
>>>> Hi Dev,
>>>> 
>>>> I would like to start a discussion about FLIP: Amazon CloudWatch Metric
>>>> Sink Connector
>>>> 
>>> 
>> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
>>  
>> <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing>
>> <
>> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
>>  
>> <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing>
>>> 
>>>> 
>>>> This FLIP is proposing to add support for Amazon CloudWatch Metric sink
>>> in
>>>> flink-connector-aws repo. Looking forward to your feedback, thank you
>>>> 
>>>> Regards,
>>>> Daren
>>>> 
>>> 
>> 
>> 
>> 
>> 
> 
> 
> 

Reply via email to