Hi devs, Thanks Hong for the help with moving the Draft into a FLIP.
I have opened a Vote thread for the FLIP here https://lists.apache.org/thread/48rdzjbh8wvo8dr9q15vlsh1f5cojx4q. I will keep this discussion thread open and please feel free to continue leaving any feedback and questions here. Thank you very much Regards, Daren > On 14 Apr 2025, at 21:22, Wong, Daren <daren...@amazon.co.uk.INVALID> wrote: > > Hi Keith, > > Thanks for the feedback and questions > >> 1. Can you elaborate what are the possible configuration key and their >> examples for CloudWatchSinkProperties? > > In addition to standard AsyncSink configuration key, i.e > “sink.batch.max-size“, the other possible configuration keys for TableAPI are: > > - metric.namespace // Required > - metric.name.key // Optional > - metric.dimension.keys // Optional > - metric.value.key // Optional > - metric.count.key // Optional > - metric.unit.key // Optional > - metric.storage-resolution.key // Optional > - metric.timestamp.key // Optional > - metric.statistic.max.key // Optional > - metric.statistic.min.key // Optional > - metric.statistic.sum.key // Optional > - metric.statistic.sample-count.key // Optional > - sink.invalid-metric.retry-mode // Optional > > At a high level, there are 3 types of key for TableAPI: > > * metric.namespace - Required in every CW PutMetricDataRequest > * metric.X.key - Column key identifier to map the Table column to the > respective fields in the CW PutMetricDataRequest. For example, > “metric.timestamp.key = my_timestamp” means the TableSink will look for > column name/field “my_timestamp” to extract it’s value to be used as > timestamp in CW PutMetricDataRequest. > * sink.invalid-metric.retry-mode - Error handling behavior when an Invalid > record is present, i.e invalid timestamp. > > Note that in DataStream API, users do not need to specify “metric.X.key” as > the elementConverter is exposed to them to achieve the same purpose. > > >> 2. I see that MetricWriteRequest's unit field is of String type. Is there a >> motivation of using String type as opposed to StandardUnit enum type? This >> cuts down on user error by left shifting correctness check to IDE/compiler. > > The motivation of using String is to not expose any AWS SDK specific models > directly to user when using the connector. However, I plan to add validation > in the sink to fail and provide a faster feedback loop to user if an > unknown/invalid unit is set. > > >> 3. Does the three options provided for error handling behaviour specifically >> just for old metrics failure case or all 400s or for 500s as well? Will >> users benefit from a broader/more flexible error handling configuration? For >> example, desirable behaviour might be to i) fail job on permission issue ii) >> dropping old records that would be rejected by CW iii) retry on throttles or >> 500s or timeouts. > > The current proposed option is aligned with i) and iii) where a fatal > exception (i.e invalid permission) will fail the job, and non-fatal exception > (i.e throttles) will be retried by default. The more fine grained control > offered to user here is specifically around InvalidRecordException where user > can select any of the 3 options for retry mode via > "sink.invalid-metric.retry-mode". > > >> 4. Should we consider bisecting or retrying remaining of the batch if CW >> PutMetricDataResponse provides sufficient information on which MetricDatum >> is rejected. > > Yes, I have explored this and PutMetricDataResponse does not provide > sufficient information on which MetricDatum is rejected. For bisecting, Flink > AsyncSink does not currently support this and we will need a custom > implementation for this which increases complexity. In addition, I would > think it we want to do bisecting, it should be done on Flink AsyncSink > interface as other connectors can benefit from this as well. > > > >> 5. On the batching of requests, how do you propose batch size (specifically >> size) is enforced? Specifically, I am interested in how we are calculating >> the data sizes of a batch. > > > * maxBatchSize - This is the number of Record (“MetricWriteRequest” or > "MetricDatum") in a single batch to be sent in a single PutMetricDataRequest. > CW sets a hard limit for this to be 1000 as mentioned by Hong. > * maxRecordSizeInBytes - This is the size of a single Record > (“MetricWriteRequest” or "MetricDatum") in bytes. CW does not expose method > to get the size in byte. Therefore, the sink will estimate size based on the > attributes in the MetricDatum, where double is treated as 8 bytes, and string > size estimated based on utf-8/ASCII encoding (as per CW doc). I.e a > MetricDatum with metric name (string length 10), 5 values and counts (double) > will have 10 + 5*8*2 = 90 bytes. Therefore, most of the size is contributed > by the string fields like dimensions followed by the values and count array. > * maxBatchSizeInBytes - This is the size of all records in a single batch to > be flushed. CW PutMetricDataRequest has a hard limit of 1MB which will be the > hard limit of this parameter as well. > > > Therefore, a sample set of limits for these config could be: maxBatchSize = > 1000, maxRecordSizeInBytes = 1000, maxBatchSizeInBytes = 1000*1000 > > Regards, > Daren > > > On 14/04/2025, 09:17, "Keith Lee" <leekeiabstract...@gmail.com > <mailto:leekeiabstract...@gmail.com>> wrote: > > > CAUTION: This email originated from outside of the organization. Do not click > links or open attachments unless you can confirm the sender and know the > content is safe. > > > > > > > Hello Daren, > > > Thank you for the FLIP. Questions below: > > > 1. Can you elaborate what are the possible configuration key and their > examples for CloudWatchSinkProperties? > > > 2. I see that MetricWriteRequest's unit field is of String type. Is there a > motivation of using String type as opposed to StandardUnit enum type? This > cuts down on user error by left shifting correctness check to IDE/compiler. > - > https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html > > <https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html> > > >> In addition, CloudWatch rejects any metric that’s more than 2 weeks old, > we will add a configurable option for users to determine the error handling > behavior of either: 1) drop the records OR 2) trigger a job failure OR 3) > keep retrying the batch. > > > 3. Does the three options provided for error handling behaviour > specifically just for old metrics failure case or all 400s or for 500s as > well? Will users benefit from a broader/more flexible error handling > configuration? For example, desirable behaviour might be to i) fail job on > permission issue ii) dropping old records that would be rejected by CW iii) > retry on throttles or 500s or timeouts. > > >> If the batch contains one MetricDatum poison pill, the request will fail > and be handled as a fully failed request. > > > 4. CShould we consider bisecting or retrying remaining of the batch if CW > PutMetricDataResponse provides sufficient information on which MetricDatum > is rejected. > > >> A list of MetricWriteRequest will be batched based on maxBatchSize which > is then submitted as a PutMetricDataRequest. > > > 5. On the batching of requests, how do you propose batch size (specifically > size) is enforced? Specifically, I am interested in how we are calculating > the data sizes of a batch. > > > Best regards > Keith Lee > > > > > On Fri, Apr 11, 2025 at 6:43 PM Wong, Daren <daren...@amazon.co.uk.inva > <mailto:daren...@amazon.co.uk.inva>lid> > wrote: > > >> Hi Hong, >> >> Thanks for the comments and suggestions, really appreciate it! >> >> >>> Clarify Cloudwatch API limitations and handling in the sink. [1] Great >> to see we're being explicit with the Java API model! Let's be explicit >> about the PutMetricsData API constraints and how we handle them (it would >> be good to have a separate section): >>> 1. Request size and count. Maximum size per request is 1MB, with >> limit of 1000 metrics. We need to enforce this during batching to prevent >> users from shooting themselves in the foot! >>> 2. For Values/Counts, limit is 150 unique metrics within a single >> request. >>> 3. Data type limitations. CloudWatch API uses Java double, but it >> doesn't support Double.NaN. We need to be explicit to handle improperly >> formatted data. We can consider failing fast/slow as you have suggested. >> Consider using "StrictEntityValidation" in the failure handling. [1] (For >> the design, we can simply mention, but work out the details when we >> implement) >>> 4. Timestamp limitations. Cloudwatch also has limitations around >> accepted timestamps (as you have noted). Metric data can be 48h in the past >> or 2h in the future. Let's clarify how we handle invalid values. >>> 5. Data ordering. CW API doesn't seem to specify limitations >> around out-of-order / repeat data. That's great, and it would be good to be >> explicit about and validate this behavior. >> >> >> This is very detailed, thank you, I have updated the FLIP outlining these >> limitations. In summary, here’s how they translate to limitation in the >> AsyncSink configuration: >> >> >> * Maximum size per CW PutMetricDataRequest is 1MB → maxBatchSizeInBytes >> cannot be more than 1 MB >> * Maximum number of MetricDatum per CW PutMetricDataRequest is 1000 → >> maxBatchSize cannot be more than 1000 >> * Maximum 150 unique values in MetricDatum.Values → maxRecordSizeInBytes >> cannot be more than 150 Bytes (assuming each 1 value size is 1 byte) >> * CloudWatch API uses Java double, but it doesn't support Double.NaN → Use >> StrictEntityValidation >> * MetricDatum Timestamp limitations (up to 2 weeks in the past and up to 2 >> hours into the future) → Validation against this with user choice of error >> handling behavior for this case >> * Data ordering. Yes I have validated CW accepts out-of-order data, I have >> updated the FLIP to point this out. >> >> >> >> >>> The PutMetricData API supports two data modes, EntityMetricData and >> MetricData [1]. Since we are only supporting MetricData for now, let's make >> sure our interface allows the extension to support EntityMetricData in the >> future. For example, we should make sure we use our own data model classes >> instead of using AWS SDK classes. Also, we currently propose to use >> wrappers instead of primitives. Let's use the primitive where we can >> >> >> Yes, agree on making the interface allows extension to support >> EntityMetricData in the future. >> >> We are using our own data model “MetricWriteRequest” and have updated the >> FLIP to use primitives. >> >> >> >>> - PutMetricData supports StrictEntityValidation [1]. As mentioned >> above, let's use this. >>> - I like that we enforce a single namespace per sink, since that is >> aligned with the PutMetricData API interface. Let's be explicit on the >> reasoning in the FLIP! >>> - Clarify sink data semantics. Since we're using the async sink, we >> only provide at-least-once semantics. Let’s make this guarantee explicit. >> >> >> Agree and updated FLIP >> >> >> >>> 4. CW sink interface. Currently we are proposing to have a static input >> data type instead of generic input type. This would require user to use a >> map separately (As you have noted). For future extensibility, I would >> prefer if we exposed an ElementConverter directly to the user. That way, we >> can provide a custom class "MetricWriteRequest" in the output interface of >> the ElementConverter that can be extended to support additional features >> (e.g. EntityMetricData) in the future. >> >> >> Thanks, I agree with both suggestions on exposing ElementConverter to >> user, and provide a custom class “MetricWriteRequest” in the output for >> extensibility. Updated the FLIP as well. >> >> >> >>> 5. Table API design. >>> - I'm not a fan of the way we currently use dimensions in the >> properties. >>> - It would be better to use a Flink-native SQL support like PRIMARY >> KEY instead [2]. This also enforces that the specified dimension cannot be >> null. >> >> >> Thanks for the suggestion, but I also see limitation in this approach for >> when user wants to define more than 1 dimension columns with PRIMARY KEY, >> and CloudWatch also allows dimensions to be optional as well. Hence, I see >> the current approach as being more flexible for user to configure, let me >> know what your thoughts are here. >> >> >> >>> 6. Housekeeping >>> - It would be good to tidy up the public interfaces linked. For >> example, we don't make any explicit usage of the public interfaces in >> FLIP-191, so we can remove that. >> >> >> Thanks for raising this, agreed and have updated the FLIP. >> >> >> Regards, >> Daren >> >> On 08/04/2025, 12:02, "Hong Liang" <h...@apache.org <mailto:h...@apache.org> >> <mailto: >> h...@apache.org <mailto:h...@apache.org>>> wrote: >> >> >> CAUTION: This email originated from outside of the organization. Do not >> click links or open attachments unless you can confirm the sender and know >> the content is safe. >> >> >> >> >> >> >> Hi Daren, >> >> >> Thanks for the contribution — exciting to see support for new sinks! I’ve >> added a few comments and suggestions below: >> >> >> 1. Clarify Cloudwatch API limitations and handling in the sink. [1] >> Great to see we're being explicit with the Java API model! Let's be >> explicit about the PutMetricsData API constraints and how we handle them >> (it would be good to have a separate section): >> 1. Request size and count. Maximum size per request is 1MB, with >> limit of 1000 metrics. We need to enforce this during batching to prevent >> users from shooting themselves in the foot! >> 2. For Values/Counts, limit is 150 unique metrics within a single >> request. >> 3. Data type limitations. CloudWatch API uses Java double, but it >> doesn't support Double.NaN. We need to be explicit to handle improperly >> formatted data. We can consider failing fast/slow as you have suggested. >> Consider using "StrictEntityValidation" in the failure handling. [1] (For >> the design, we can simply mention, but work out the details when we >> implement) >> 4. Timestamp limitations. Cloudwatch also has limitations around >> accepted timestamps (as you have noted). Metric data can be 48h in the past >> or 2h in the future. Let's clarify how we handle invalid values. >> 5. Data ordering. CW API doesn't seem to specify limitations around >> out-of-order / repeat data. That's great, and it would be good to be >> explicit about and validate this behavior. >> 2. Clarify supported features [1] >> - The PutMetricData API supports two data modes, EntityMetricData and >> MetricData [1]. Since we are only supporting MetricData for now, let's make >> sure our interface allows the extension to support EntityMetricData in the >> future. For example, we should make sure we use our own data model classes >> instead of using AWS SDK classes. Also, we currently propose to use >> wrappers instead of primitives. Let's use the primitive where we can :). >> - PutMetricData supports StrictEntityValidation [1]. As mentioned >> above, let's use this. >> - I like that we enforce a single namespace per sink, since that is >> aligned with the PutMetricData API interface. Let's be explicit on the >> reasoning in the FLIP! >> 3. Clarify sink data semantics. Since we're using the async sink, we only >> provide at-least-once semantics. Let’s make this guarantee explicit. >> 4. CW sink interface. Currently we are proposing to have a static input >> data type instead of generic input type. This would require user to use a >> map separately (As you have noted). For future extensibility, I would >> prefer if we exposed an ElementConverter directly to the user. That way, we >> can provide a custom class "MetricWriteRequest" in the output interface of >> the ElementConverter that can be extended to support additional features >> (e.g. EntityMetricData) in the future. >> 5. Table API design. >> - I'm not a fan of the way we currently use dimensions in the >> properties. >> - It would be better to use a Flink-native SQL support like PRIMARY KEY >> instead [2]. This also enforces that the specified dimension cannot be >> null. >> 6. Housekeeping >> - It would be good to tidy up the public interfaces linked. For example, >> we don't make any explicit usage of the public interfaces in FLIP-191, so >> we can remove that. >> >> >> >> >> Overall, nice FLIP! Thanks for the detail and making it an easy read. Hope >> the above helps! >> >> >> >> >> Cheers, >> Hong >> >> >> >> >> [1] >> >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >> >> <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html> >> < >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >> >> <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html> >>> >> >> >> [2] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >> >> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key> >> < >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >> >> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key> >>> >> >> >> >> >> >> >> >> >> On Mon, Apr 7, 2025 at 9:24 PM Ahmed Hamdy <hamdy10...@gmail.com >> <mailto:hamdy10...@gmail.com> <mailto: >> hamdy10...@gmail.com <mailto:hamdy10...@gmail.com>>> wrote: >> >> >>> Hi Daren thanks for the FLIP >>> >>> Just a couple of questions and comments? >>> >>>> Usable in both DataStream and Table API/SQL >>> What about python API? this is sth we should consider ahead since the >>> abstract element converter doesn't have a Flink type mapping to be used >>> from python, this is a issue we faced with DDB before >>> >>>> Therefore, the connector will provide a CloudWatchMetricInput model >> that >>> user can use to pass as input to the connector. For example, in >> DataStream >>> API, it could be a MapFunction called just before passing to the sink as >>> follows: >>> I am not quite sure I follow, are you suggesting we introduce a >>> specific new converter class or relay that to users? also since you >>> mentioned FLIP-171, are you suggesting to implement this sink as an >>> extension to Async Sink, in that case It is more confusing to me how we >> are >>> going to use the map function with the AsyncSink.ElementConvertor. >>> >>>> public class SampleToCloudWatchMetricInputMapper implements MapFunction< >>> Sample, CloudWatchMetricInput> >>> >>> Is CloudWatchMetricInput a newly introduced model class, I couldn't find >> it >>> in the sdkv2, If we are introducing it then it might be useful to add to >>> the FLIP since this is part of the API. >>> >>> >>>> Supports both Bounded (Batch) and Unbounded (Streaming) >>> >>> What do you propose to handle them differently? I can't find a specific >>> thing in the FLIP >>> >>> Regarding table API >>> >>>> 'metric.dimension.keys' = 'cw_dim', >>> >>> I am not in favor of doing this as this will complicate the schema >>> validation on table creation, maybe we can use the whole schema as >>> dimensions excluding the values and the count, let me know your thoughts >>> here. >>> >>>> 'metric.name.key' = 'cw_metric_name', >>> >>> So we are making the metric part of the row data? have we considered not >>> doing that instead and having 1 table map to 1 metric instead of >> namespace? >>> It might be more suitable to enforce some validations on the dimensions >>> schema this way. Ofc this will probably have is introduce some >> intermediate >>> class in the model to hold the dimensions, values and counts without the >>> metric name and namespace that we will extract from the sink definition, >>> let me know your thoughts here? >>> >>> >>>> `cw_value` BIGINT, >>> Are we going to allow all numeric types for values? >>> >>>> protected void submitRequestEntries( >>> List<MetricDatum> requestEntries, >>> Consumer<List<MetricDatum>> requestResult) >>> >>> nit: This method should be deprecated after 1.20. I hope the repo is >>> upgraded by the time we implement this >>> >>>> Error Handling >>> Away from poison pills, what error handling are you suggesting? Are we >>> following the footsteps of the other AWS connectors with error >>> classification, is there any effort to abstract it on the AWS side? >>> >>> And on the topic of poison pills, If I understand correctly that is a >> topic >>> that has been discussed for a while, this ofc breaks the at-least-once >>> semantic and might be confusing to the users, additionally since cloud >>> watch API fails the full batch how are you suggesting we identify the >>> poison pills? I am personally in favor of global failures in this case >> but >>> would love to hear the feedback here. >>> >>> >>> >>> Best Regards >>> Ahmed Hamdy >>> >>> >>> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva >>> <mailto:daren...@amazon.co.uk.inva> >> <mailto:daren...@amazon.co.uk.inva <mailto:daren...@amazon.co.uk.inva>>lid> >>> wrote: >>> >>>> Hi Dev, >>>> >>>> I would like to start a discussion about FLIP: Amazon CloudWatch Metric >>>> Sink Connector >>>> >>> >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >> >> <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing> >> < >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >> >> <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing> >>> >>>> >>>> This FLIP is proposing to add support for Amazon CloudWatch Metric sink >>> in >>>> flink-connector-aws repo. Looking forward to your feedback, thank you >>>> >>>> Regards, >>>> Daren >>>> >>> >> >> >> >> > > >