Hi Ahmed, Thanks for the detailed feedback and questions
Just a quick note that after gathering you and Hong’s feedback, I have updated the design to remove “CloudWatchMetricInput” model and introduce “MetricWriteRequest” instead, and we will expose an element converter that user can implement to convert from a generic input type to the static output type, i.e “MetricWriteRequest”. > What about python API? this is sth we should consider ahead since the > abstract element converter doesn't have a Flink type mapping to be used from > python, this is a issue we faced with DDB before For python DataStream API, with the updated approach, I think we could expose the element converter in “flink-python” package [1] that user can import and implement to convert from their FlinkType to “MetricWriteRequest”. > Is CloudWatchMetricInput a newly introduced model class, I couldn't find it > in the sdkv2, If we are introducing it then it might be useful to add to the > FLIP since this is part of the API. Yes, “MetricWriteRequest” is a newly introduced model class. Thanks I have added it to the FLIP. > I am not quite sure I follow, are you suggesting we introduce a specific new > converter class or relay that to users? also since you mentioned FLIP-171, > are you suggesting to implement this sink as an extension to Async Sink, in > that case It is more confusing to me how we are going to use the map function > with the AsyncSink.ElementConvertor. Thanks for pointing this out, Hong made a suggestion in his comment that I think helped clarify on this as well. We will expose an “ElementConvertor” that converts from user defined input type to a static output type called “MetricWriteRequest”. Hence, user can provide their own element convertor logic and they can freely define their own input type. > What do you propose to Bounded (Batch) and Unbounded (Streaming) them > differently? I can't find a specific thing in the FLIP Thanks for flagging this, based on the current sink design, I think it should be able to support both batch and streaming mode without requiring different ways of handling or batching the records in the AsyncSinkWriter, but please let me know if there’s gap in my understanding here. > I am not in favor of doing this as this will complicate the schema validation > on table creation, maybe we can use the whole schema as dimensions excluding > the values and the count, let me know your thoughts here. I agree it complicates the schema, and yes I am open to explore options to simplify it. My thoughts are if we want to exclude the values and counts from schema, then we need another way to provide information to the Sink so that it knows which column in the RowData corresponds to the values and counts that should be in the CW MetricDatum. And I believe the challenge here is we can’t expose ElementConvertor for TableAPI/SQL use cases here. Therefore, alternatives I can think of are: * Option 1: Make “values” and “counts” as reserved keywords to be used in the conversion from RowData to MetricDatum. Downside of this is user may want to name their column differently based on their use case/context or may want to reserve these keywords for something else. * Option 2: Introduce a set of prefix to identify them, i.e “VALUES_”, “COUNTS_”. This approach is not worth it for me as it merely shifts the complexity from table option to table column naming. Also, note that values and counts are optional here, so user can choose to not define them in table options if for example they are using StatisticSet instead of Values and Counts. Making these options optional also help simplify the usage of schema. > So we are making the metric part of the row data? have we considered not > doing that instead and having 1 table map to 1 metric instead of namespace? > It might be more suitable to enforce some validations on the dimensions > schema this way. Ofc this will probably have is introduce some intermediate > class in the model to hold the dimensions, values and counts without the > metric name and namespace that we will extract from the sink definition, let > me know your thoughts here? Yes, the proposal is to make metricName part of row data, and yes that means 1 table per namespace. The reason for 1 table per metricName is because it is aligned with CW PutMetricDataRequest API interface which is mentioned by Hong. Other considerations I had was on flexibility, for example 1 table per namespace can support/cater for 1 table per metricName use cases but not vice versa. > Are we going to allow all numeric types for values? Yes, cloudwatch requires values to be Double, so I think we can support all numeric type here: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE. > protected void submitRequestEntries(List<MetricDatum> requestEntries, > Consumer<List<MetricDatum>> requestResult) nit: This method should be > deprecated after 1.20. I hope the repo is upgraded by the time we implement > this Thanks for flagging this, I have updated the example to be compatible with 2.0. > Away from poison pills, what error handling are you suggesting? Are we > following the footsteps of the other AWS connectors with error > classification, is there any effort to abstract it on the AWS side? Currently I am following the footsteps of other AWS connectors for error classification. I did a quick search and did not see JIRA raised to abstract it, I will raise it as a separate feature improvement JIRA. However, we are going to expose config to allow user to choose their error failover behavior i.e fail fast, retry, or drop the batch. > And on the topic of poison pills, If I understand correctly that is a topic > that has been discussed for a while, this ofc breaks the at-least-once > semantic and might be confusing to the users, additionally since cloudwatch > API fails the full batch how are you suggesting we identify the poison pills? > I am personally in favor of global failures in this case but would love to > hear the feedback here. This is a good point, and after gathering feedback from others, the current way forward is to expose a config for user to decide and control the error handling behavior: Option 1: Fail the job. Option 2: Keep retrying the batch. Option 3: Drop the batch. Once again, thanks a lot for the feedback and questions Regards, Daren [1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py On 2025/04/07 20:23:38 Ahmed Hamdy wrote: > Hi Daren thanks for the FLIP > > Just a couple of questions and comments? > > > Usable in both DataStream and Table API/SQL > What about python API? this is sth we should consider ahead since the > abstract element converter doesn't have a Flink type mapping to be used > from python, this is a issue we faced with DDB before > > > Therefore, the connector will provide a CloudWatchMetricInput model that > user can use to pass as input to the connector. For example, in DataStream > API, it could be a MapFunction called just before passing to the sink as > follows: > I am not quite sure I follow, are you suggesting we introduce a > specific new converter class or relay that to users? also since you > mentioned FLIP-171, are you suggesting to implement this sink as an > extension to Async Sink, in that case It is more confusing to me how we are > going to use the map function with the AsyncSink.ElementConvertor. > > >public class SampleToCloudWatchMetricInputMapper implements MapFunction< > Sample, CloudWatchMetricInput> > > Is CloudWatchMetricInput a newly introduced model class, I couldn't find it > in the sdkv2, If we are introducing it then it might be useful to add to > the FLIP since this is part of the API. > > > > Supports both Bounded (Batch) and Unbounded (Streaming) > > What do you propose to handle them differently? I can't find a specific > thing in the FLIP > > Regarding table API > > > 'metric.dimension.keys' = 'cw_dim', > > I am not in favor of doing this as this will complicate the schema > validation on table creation, maybe we can use the whole schema as > dimensions excluding the values and the count, let me know your thoughts > here. > > > 'metric.name.key' = 'cw_metric_name', > > So we are making the metric part of the row data? have we considered not > doing that instead and having 1 table map to 1 metric instead of namespace? > It might be more suitable to enforce some validations on the dimensions > schema this way. Ofc this will probably have is introduce some intermediate > class in the model to hold the dimensions, values and counts without the > metric name and namespace that we will extract from the sink definition, > let me know your thoughts here? > > > >`cw_value` BIGINT, > Are we going to allow all numeric types for values? > > > protected void submitRequestEntries( > List<MetricDatum> requestEntries, > Consumer<List<MetricDatum>> requestResult) > > nit: This method should be deprecated after 1.20. I hope the repo is > upgraded by the time we implement this > > > Error Handling > Away from poison pills, what error handling are you suggesting? Are we > following the footsteps of the other AWS connectors with error > classification, is there any effort to abstract it on the AWS side? > > And on the topic of poison pills, If I understand correctly that is a topic > that has been discussed for a while, this ofc breaks the at-least-once > semantic and might be confusing to the users, additionally since cloud > watch API fails the full batch how are you suggesting we identify the > poison pills? I am personally in favor of global failures in this case but > would love to hear the feedback here. > > > > Best Regards > Ahmed Hamdy > > > On Mon, 7 Apr 2025 at 11:29, Wong, Daren <da...@amazon.co.uk.invalid> > wrote: > > > Hi Dev, > > > > I would like to start a discussion about FLIP: Amazon CloudWatch Metric > > Sink Connector > > https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing > > > > This FLIP is proposing to add support for Amazon CloudWatch Metric sink in > > flink-connector-aws repo. Looking forward to your feedback, thank you > > > > Regards, > > Daren > > >