Hi Robert, Thanks for the question, I took a look and agree with your latter point that it is not necessary as Flink StatsD metrics reporter works with CloudWatch Agent StatsD interface.
Regards, Daren > On 18 Apr 2025, at 15:57, Robert Metzger <rmetz...@apache.org> wrote: > > Hi, > quick, somewhat related question: > There is no Cloudwatch metrics reporter for Flink yet. Would it make sense > to add one, or is that not necessary because Cloudwatch supports other > metrics reporters (statds?) > > Best, > Robert > > > > On Wed, Apr 16, 2025 at 10:50 AM Daren Wong <darenwkt....@gmail.com> wrote: > >> Hi devs, >> >> Thanks Hong for the help with moving the Draft into a FLIP. >> >> I have opened a Vote thread for the FLIP here >> https://lists.apache.org/thread/48rdzjbh8wvo8dr9q15vlsh1f5cojx4q. >> >> I will keep this discussion thread open and please feel free to continue >> leaving any feedback and questions here. Thank you very much >> >> Regards, >> Daren >> >> >>> On 14 Apr 2025, at 21:22, Wong, Daren <daren...@amazon.co.uk.INVALID> >> wrote: >>> >>> Hi Keith, >>> >>> Thanks for the feedback and questions >>> >>>> 1. Can you elaborate what are the possible configuration key and their >> examples for CloudWatchSinkProperties? >>> >>> In addition to standard AsyncSink configuration key, i.e >> “sink.batch.max-size“, the other possible configuration keys for TableAPI >> are: >>> >>> - metric.namespace // Required >>> - metric.name.key // Optional >>> - metric.dimension.keys // Optional >>> - metric.value.key // Optional >>> - metric.count.key // Optional >>> - metric.unit.key // Optional >>> - metric.storage-resolution.key // Optional >>> - metric.timestamp.key // Optional >>> - metric.statistic.max.key // Optional >>> - metric.statistic.min.key // Optional >>> - metric.statistic.sum.key // Optional >>> - metric.statistic.sample-count.key // Optional >>> - sink.invalid-metric.retry-mode // Optional >>> >>> At a high level, there are 3 types of key for TableAPI: >>> >>> * metric.namespace - Required in every CW PutMetricDataRequest >>> * metric.X.key - Column key identifier to map the Table column to the >> respective fields in the CW PutMetricDataRequest. For example, >> “metric.timestamp.key = my_timestamp” means the TableSink will look for >> column name/field “my_timestamp” to extract it’s value to be used as >> timestamp in CW PutMetricDataRequest. >>> * sink.invalid-metric.retry-mode - Error handling behavior when an >> Invalid record is present, i.e invalid timestamp. >>> >>> Note that in DataStream API, users do not need to specify “metric.X.key” >> as the elementConverter is exposed to them to achieve the same purpose. >>> >>> >>>> 2. I see that MetricWriteRequest's unit field is of String type. Is >> there a motivation of using String type as opposed to StandardUnit enum >> type? This cuts down on user error by left shifting correctness check to >> IDE/compiler. >>> >>> The motivation of using String is to not expose any AWS SDK specific >> models directly to user when using the connector. However, I plan to add >> validation in the sink to fail and provide a faster feedback loop to user >> if an unknown/invalid unit is set. >>> >>> >>>> 3. Does the three options provided for error handling behaviour >> specifically just for old metrics failure case or all 400s or for 500s as >> well? Will users benefit from a broader/more flexible error handling >> configuration? For example, desirable behaviour might be to i) fail job on >> permission issue ii) dropping old records that would be rejected by CW iii) >> retry on throttles or 500s or timeouts. >>> >>> The current proposed option is aligned with i) and iii) where a fatal >> exception (i.e invalid permission) will fail the job, and non-fatal >> exception (i.e throttles) will be retried by default. The more fine grained >> control offered to user here is specifically around InvalidRecordException >> where user can select any of the 3 options for retry mode via >> "sink.invalid-metric.retry-mode". >>> >>> >>>> 4. Should we consider bisecting or retrying remaining of the batch if >> CW PutMetricDataResponse provides sufficient information on which >> MetricDatum is rejected. >>> >>> Yes, I have explored this and PutMetricDataResponse does not provide >> sufficient information on which MetricDatum is rejected. For bisecting, >> Flink AsyncSink does not currently support this and we will need a custom >> implementation for this which increases complexity. In addition, I would >> think it we want to do bisecting, it should be done on Flink AsyncSink >> interface as other connectors can benefit from this as well. >>> >>> >>> >>>> 5. On the batching of requests, how do you propose batch size >> (specifically size) is enforced? Specifically, I am interested in how we >> are calculating the data sizes of a batch. >>> >>> >>> * maxBatchSize - This is the number of Record (“MetricWriteRequest” or >> "MetricDatum") in a single batch to be sent in a single >> PutMetricDataRequest. CW sets a hard limit for this to be 1000 as mentioned >> by Hong. >>> * maxRecordSizeInBytes - This is the size of a single Record >> (“MetricWriteRequest” or "MetricDatum") in bytes. CW does not expose method >> to get the size in byte. Therefore, the sink will estimate size based on >> the attributes in the MetricDatum, where double is treated as 8 bytes, and >> string size estimated based on utf-8/ASCII encoding (as per CW doc). I.e a >> MetricDatum with metric name (string length 10), 5 values and counts >> (double) will have 10 + 5*8*2 = 90 bytes. Therefore, most of the size is >> contributed by the string fields like dimensions followed by the values and >> count array. >>> * maxBatchSizeInBytes - This is the size of all records in a single >> batch to be flushed. CW PutMetricDataRequest has a hard limit of 1MB which >> will be the hard limit of this parameter as well. >>> >>> >>> Therefore, a sample set of limits for these config could be: >> maxBatchSize = 1000, maxRecordSizeInBytes = 1000, maxBatchSizeInBytes = >> 1000*1000 >>> >>> Regards, >>> Daren >>> >>> >>> On 14/04/2025, 09:17, "Keith Lee" <leekeiabstract...@gmail.com <mailto: >> leekeiabstract...@gmail.com>> wrote: >>> >>> >>> CAUTION: This email originated from outside of the organization. Do not >> click links or open attachments unless you can confirm the sender and know >> the content is safe. >>> >>> >>> >>> >>> >>> >>> Hello Daren, >>> >>> >>> Thank you for the FLIP. Questions below: >>> >>> >>> 1. Can you elaborate what are the possible configuration key and their >>> examples for CloudWatchSinkProperties? >>> >>> >>> 2. I see that MetricWriteRequest's unit field is of String type. Is >> there a >>> motivation of using String type as opposed to StandardUnit enum type? >> This >>> cuts down on user error by left shifting correctness check to >> IDE/compiler. >>> - >>> >> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html >> < >> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html >>> >>> >>> >>>> In addition, CloudWatch rejects any metric that’s more than 2 weeks old, >>> we will add a configurable option for users to determine the error >> handling >>> behavior of either: 1) drop the records OR 2) trigger a job failure OR 3) >>> keep retrying the batch. >>> >>> >>> 3. Does the three options provided for error handling behaviour >>> specifically just for old metrics failure case or all 400s or for 500s as >>> well? Will users benefit from a broader/more flexible error handling >>> configuration? For example, desirable behaviour might be to i) fail job >> on >>> permission issue ii) dropping old records that would be rejected by CW >> iii) >>> retry on throttles or 500s or timeouts. >>> >>> >>>> If the batch contains one MetricDatum poison pill, the request will fail >>> and be handled as a fully failed request. >>> >>> >>> 4. CShould we consider bisecting or retrying remaining of the batch if CW >>> PutMetricDataResponse provides sufficient information on which >> MetricDatum >>> is rejected. >>> >>> >>>> A list of MetricWriteRequest will be batched based on maxBatchSize which >>> is then submitted as a PutMetricDataRequest. >>> >>> >>> 5. On the batching of requests, how do you propose batch size >> (specifically >>> size) is enforced? Specifically, I am interested in how we are >> calculating >>> the data sizes of a batch. >>> >>> >>> Best regards >>> Keith Lee >>> >>> >>> >>> >>> On Fri, Apr 11, 2025 at 6:43 PM Wong, Daren <daren...@amazon.co.uk.inva >> <mailto:daren...@amazon.co.uk.inva>lid> >>> wrote: >>> >>> >>>> Hi Hong, >>>> >>>> Thanks for the comments and suggestions, really appreciate it! >>>> >>>> >>>>> Clarify Cloudwatch API limitations and handling in the sink. [1] Great >>>> to see we're being explicit with the Java API model! Let's be explicit >>>> about the PutMetricsData API constraints and how we handle them (it >> would >>>> be good to have a separate section): >>>>> 1. Request size and count. Maximum size per request is 1MB, with >>>> limit of 1000 metrics. We need to enforce this during batching to >> prevent >>>> users from shooting themselves in the foot! >>>>> 2. For Values/Counts, limit is 150 unique metrics within a single >>>> request. >>>>> 3. Data type limitations. CloudWatch API uses Java double, but it >>>> doesn't support Double.NaN. We need to be explicit to handle improperly >>>> formatted data. We can consider failing fast/slow as you have suggested. >>>> Consider using "StrictEntityValidation" in the failure handling. [1] >> (For >>>> the design, we can simply mention, but work out the details when we >>>> implement) >>>>> 4. Timestamp limitations. Cloudwatch also has limitations around >>>> accepted timestamps (as you have noted). Metric data can be 48h in the >> past >>>> or 2h in the future. Let's clarify how we handle invalid values. >>>>> 5. Data ordering. CW API doesn't seem to specify limitations >>>> around out-of-order / repeat data. That's great, and it would be good >> to be >>>> explicit about and validate this behavior. >>>> >>>> >>>> This is very detailed, thank you, I have updated the FLIP outlining >> these >>>> limitations. In summary, here’s how they translate to limitation in the >>>> AsyncSink configuration: >>>> >>>> >>>> * Maximum size per CW PutMetricDataRequest is 1MB → maxBatchSizeInBytes >>>> cannot be more than 1 MB >>>> * Maximum number of MetricDatum per CW PutMetricDataRequest is 1000 → >>>> maxBatchSize cannot be more than 1000 >>>> * Maximum 150 unique values in MetricDatum.Values → maxRecordSizeInBytes >>>> cannot be more than 150 Bytes (assuming each 1 value size is 1 byte) >>>> * CloudWatch API uses Java double, but it doesn't support Double.NaN → >> Use >>>> StrictEntityValidation >>>> * MetricDatum Timestamp limitations (up to 2 weeks in the past and up >> to 2 >>>> hours into the future) → Validation against this with user choice of >> error >>>> handling behavior for this case >>>> * Data ordering. Yes I have validated CW accepts out-of-order data, I >> have >>>> updated the FLIP to point this out. >>>> >>>> >>>> >>>> >>>>> The PutMetricData API supports two data modes, EntityMetricData and >>>> MetricData [1]. Since we are only supporting MetricData for now, let's >> make >>>> sure our interface allows the extension to support EntityMetricData in >> the >>>> future. For example, we should make sure we use our own data model >> classes >>>> instead of using AWS SDK classes. Also, we currently propose to use >>>> wrappers instead of primitives. Let's use the primitive where we can >>>> >>>> >>>> Yes, agree on making the interface allows extension to support >>>> EntityMetricData in the future. >>>> >>>> We are using our own data model “MetricWriteRequest” and have updated >> the >>>> FLIP to use primitives. >>>> >>>> >>>> >>>>> - PutMetricData supports StrictEntityValidation [1]. As mentioned >>>> above, let's use this. >>>>> - I like that we enforce a single namespace per sink, since that is >>>> aligned with the PutMetricData API interface. Let's be explicit on the >>>> reasoning in the FLIP! >>>>> - Clarify sink data semantics. Since we're using the async sink, we >>>> only provide at-least-once semantics. Let’s make this guarantee >> explicit. >>>> >>>> >>>> Agree and updated FLIP >>>> >>>> >>>> >>>>> 4. CW sink interface. Currently we are proposing to have a static input >>>> data type instead of generic input type. This would require user to use >> a >>>> map separately (As you have noted). For future extensibility, I would >>>> prefer if we exposed an ElementConverter directly to the user. That >> way, we >>>> can provide a custom class "MetricWriteRequest" in the output interface >> of >>>> the ElementConverter that can be extended to support additional features >>>> (e.g. EntityMetricData) in the future. >>>> >>>> >>>> Thanks, I agree with both suggestions on exposing ElementConverter to >>>> user, and provide a custom class “MetricWriteRequest” in the output for >>>> extensibility. Updated the FLIP as well. >>>> >>>> >>>> >>>>> 5. Table API design. >>>>> - I'm not a fan of the way we currently use dimensions in the >>>> properties. >>>>> - It would be better to use a Flink-native SQL support like PRIMARY >>>> KEY instead [2]. This also enforces that the specified dimension cannot >> be >>>> null. >>>> >>>> >>>> Thanks for the suggestion, but I also see limitation in this approach >> for >>>> when user wants to define more than 1 dimension columns with PRIMARY >> KEY, >>>> and CloudWatch also allows dimensions to be optional as well. Hence, I >> see >>>> the current approach as being more flexible for user to configure, let >> me >>>> know what your thoughts are here. >>>> >>>> >>>> >>>>> 6. Housekeeping >>>>> - It would be good to tidy up the public interfaces linked. For >>>> example, we don't make any explicit usage of the public interfaces in >>>> FLIP-191, so we can remove that. >>>> >>>> >>>> Thanks for raising this, agreed and have updated the FLIP. >>>> >>>> >>>> Regards, >>>> Daren >>>> >>>> On 08/04/2025, 12:02, "Hong Liang" <h...@apache.org <mailto: >> h...@apache.org> <mailto: >>>> h...@apache.org <mailto:h...@apache.org>>> wrote: >>>> >>>> >>>> CAUTION: This email originated from outside of the organization. Do not >>>> click links or open attachments unless you can confirm the sender and >> know >>>> the content is safe. >>>> >>>> >>>> >>>> >>>> >>>> >>>> Hi Daren, >>>> >>>> >>>> Thanks for the contribution — exciting to see support for new sinks! >> I’ve >>>> added a few comments and suggestions below: >>>> >>>> >>>> 1. Clarify Cloudwatch API limitations and handling in the sink. [1] >>>> Great to see we're being explicit with the Java API model! Let's be >>>> explicit about the PutMetricsData API constraints and how we handle them >>>> (it would be good to have a separate section): >>>> 1. Request size and count. Maximum size per request is 1MB, with >>>> limit of 1000 metrics. We need to enforce this during batching to >> prevent >>>> users from shooting themselves in the foot! >>>> 2. For Values/Counts, limit is 150 unique metrics within a single >>>> request. >>>> 3. Data type limitations. CloudWatch API uses Java double, but it >>>> doesn't support Double.NaN. We need to be explicit to handle improperly >>>> formatted data. We can consider failing fast/slow as you have suggested. >>>> Consider using "StrictEntityValidation" in the failure handling. [1] >> (For >>>> the design, we can simply mention, but work out the details when we >>>> implement) >>>> 4. Timestamp limitations. Cloudwatch also has limitations around >>>> accepted timestamps (as you have noted). Metric data can be 48h in the >> past >>>> or 2h in the future. Let's clarify how we handle invalid values. >>>> 5. Data ordering. CW API doesn't seem to specify limitations around >>>> out-of-order / repeat data. That's great, and it would be good to be >>>> explicit about and validate this behavior. >>>> 2. Clarify supported features [1] >>>> - The PutMetricData API supports two data modes, EntityMetricData and >>>> MetricData [1]. Since we are only supporting MetricData for now, let's >> make >>>> sure our interface allows the extension to support EntityMetricData in >> the >>>> future. For example, we should make sure we use our own data model >> classes >>>> instead of using AWS SDK classes. Also, we currently propose to use >>>> wrappers instead of primitives. Let's use the primitive where we can :). >>>> - PutMetricData supports StrictEntityValidation [1]. As mentioned >>>> above, let's use this. >>>> - I like that we enforce a single namespace per sink, since that is >>>> aligned with the PutMetricData API interface. Let's be explicit on the >>>> reasoning in the FLIP! >>>> 3. Clarify sink data semantics. Since we're using the async sink, we >> only >>>> provide at-least-once semantics. Let’s make this guarantee explicit. >>>> 4. CW sink interface. Currently we are proposing to have a static input >>>> data type instead of generic input type. This would require user to use >> a >>>> map separately (As you have noted). For future extensibility, I would >>>> prefer if we exposed an ElementConverter directly to the user. That >> way, we >>>> can provide a custom class "MetricWriteRequest" in the output interface >> of >>>> the ElementConverter that can be extended to support additional features >>>> (e.g. EntityMetricData) in the future. >>>> 5. Table API design. >>>> - I'm not a fan of the way we currently use dimensions in the >>>> properties. >>>> - It would be better to use a Flink-native SQL support like PRIMARY KEY >>>> instead [2]. This also enforces that the specified dimension cannot be >>>> null. >>>> 6. Housekeeping >>>> - It would be good to tidy up the public interfaces linked. For example, >>>> we don't make any explicit usage of the public interfaces in FLIP-191, >> so >>>> we can remove that. >>>> >>>> >>>> >>>> >>>> Overall, nice FLIP! Thanks for the detail and making it an easy read. >> Hope >>>> the above helps! >>>> >>>> >>>> >>>> >>>> Cheers, >>>> Hong >>>> >>>> >>>> >>>> >>>> [1] >>>> >>>> >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >> < >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >>> >>>> < >>>> >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >> < >> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html >>> >>>>> >>>> >>>> >>>> [2] >>>> >>>> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >> < >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >>> >>>> < >>>> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >> < >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key >>> >>>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Apr 7, 2025 at 9:24 PM Ahmed Hamdy <hamdy10...@gmail.com >> <mailto:hamdy10...@gmail.com> <mailto: >>>> hamdy10...@gmail.com <mailto:hamdy10...@gmail.com>>> wrote: >>>> >>>> >>>>> Hi Daren thanks for the FLIP >>>>> >>>>> Just a couple of questions and comments? >>>>> >>>>>> Usable in both DataStream and Table API/SQL >>>>> What about python API? this is sth we should consider ahead since the >>>>> abstract element converter doesn't have a Flink type mapping to be used >>>>> from python, this is a issue we faced with DDB before >>>>> >>>>>> Therefore, the connector will provide a CloudWatchMetricInput model >>>> that >>>>> user can use to pass as input to the connector. For example, in >>>> DataStream >>>>> API, it could be a MapFunction called just before passing to the sink >> as >>>>> follows: >>>>> I am not quite sure I follow, are you suggesting we introduce a >>>>> specific new converter class or relay that to users? also since you >>>>> mentioned FLIP-171, are you suggesting to implement this sink as an >>>>> extension to Async Sink, in that case It is more confusing to me how we >>>> are >>>>> going to use the map function with the AsyncSink.ElementConvertor. >>>>> >>>>>> public class SampleToCloudWatchMetricInputMapper implements >> MapFunction< >>>>> Sample, CloudWatchMetricInput> >>>>> >>>>> Is CloudWatchMetricInput a newly introduced model class, I couldn't >> find >>>> it >>>>> in the sdkv2, If we are introducing it then it might be useful to add >> to >>>>> the FLIP since this is part of the API. >>>>> >>>>> >>>>>> Supports both Bounded (Batch) and Unbounded (Streaming) >>>>> >>>>> What do you propose to handle them differently? I can't find a specific >>>>> thing in the FLIP >>>>> >>>>> Regarding table API >>>>> >>>>>> 'metric.dimension.keys' = 'cw_dim', >>>>> >>>>> I am not in favor of doing this as this will complicate the schema >>>>> validation on table creation, maybe we can use the whole schema as >>>>> dimensions excluding the values and the count, let me know your >> thoughts >>>>> here. >>>>> >>>>>> 'metric.name.key' = 'cw_metric_name', >>>>> >>>>> So we are making the metric part of the row data? have we considered >> not >>>>> doing that instead and having 1 table map to 1 metric instead of >>>> namespace? >>>>> It might be more suitable to enforce some validations on the dimensions >>>>> schema this way. Ofc this will probably have is introduce some >>>> intermediate >>>>> class in the model to hold the dimensions, values and counts without >> the >>>>> metric name and namespace that we will extract from the sink >> definition, >>>>> let me know your thoughts here? >>>>> >>>>> >>>>>> `cw_value` BIGINT, >>>>> Are we going to allow all numeric types for values? >>>>> >>>>>> protected void submitRequestEntries( >>>>> List<MetricDatum> requestEntries, >>>>> Consumer<List<MetricDatum>> requestResult) >>>>> >>>>> nit: This method should be deprecated after 1.20. I hope the repo is >>>>> upgraded by the time we implement this >>>>> >>>>>> Error Handling >>>>> Away from poison pills, what error handling are you suggesting? Are we >>>>> following the footsteps of the other AWS connectors with error >>>>> classification, is there any effort to abstract it on the AWS side? >>>>> >>>>> And on the topic of poison pills, If I understand correctly that is a >>>> topic >>>>> that has been discussed for a while, this ofc breaks the at-least-once >>>>> semantic and might be confusing to the users, additionally since cloud >>>>> watch API fails the full batch how are you suggesting we identify the >>>>> poison pills? I am personally in favor of global failures in this case >>>> but >>>>> would love to hear the feedback here. >>>>> >>>>> >>>>> >>>>> Best Regards >>>>> Ahmed Hamdy >>>>> >>>>> >>>>> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva >> <mailto:daren...@amazon.co.uk.inva> >>>> <mailto:daren...@amazon.co.uk.inva <mailto:daren...@amazon.co.uk.inva >>>> lid> >>>>> wrote: >>>>> >>>>>> Hi Dev, >>>>>> >>>>>> I would like to start a discussion about FLIP: Amazon CloudWatch >> Metric >>>>>> Sink Connector >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >> < >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >>> >>>> < >>>> >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >> < >> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing >>> >>>>> >>>>>> >>>>>> This FLIP is proposing to add support for Amazon CloudWatch Metric >> sink >>>>> in >>>>>> flink-connector-aws repo. Looking forward to your feedback, thank you >>>>>> >>>>>> Regards, >>>>>> Daren >>>>>> >>>>> >>>> >>>> >>>> >>>> >>> >>> >>> >> >>