Hi Ahmed,

Thanks for the detailed feedback and questions

Just a quick note that after gathering you and Hong’s feedback, I have updated 
the design to remove “CloudWatchMetricInput” model and introduce 
“MetricWriteRequest” instead, and we will expose an element converter that user 
can implement to convert from a generic input type to the static output type, 
i.e “MetricWriteRequest”.


> What about python API? this is sth we should consider ahead since the 
> abstract element converter doesn't have a Flink type mapping to be used from 
> python, this is a issue we faced with DDB before


For python DataStream API, with the updated approach, I think we could expose 
the element converter in “flink-python” package [1] that user can import and 
implement to convert from their FlinkType to “MetricWriteRequest”. 



> Is CloudWatchMetricInput a newly introduced model class, I couldn't find it 
> in the sdkv2, If we are introducing it then it might be useful to add to the 
> FLIP since this is part of the API.


Yes, “MetricWriteRequest” is a newly introduced model class. Thanks I have 
added it to the FLIP. 



> I am not quite sure I follow, are you suggesting we introduce a specific new 
> converter class or relay that to users? also since you mentioned FLIP-171, 
> are you suggesting to implement this sink as an extension to Async Sink, in 
> that case It is more confusing to me how we are going to use the map function 
> with the AsyncSink.ElementConvertor.


Thanks for pointing this out, Hong made a suggestion in his comment that I 
think helped clarify on this as well.

We will expose an “ElementConvertor” that converts from user defined input type 
to a static output type called “MetricWriteRequest”. Hence, user can provide 
their own element convertor logic and they can freely define their own input 
type.



> What do you propose to Bounded (Batch) and Unbounded (Streaming) them 
> differently? I can't find a specific thing in the FLIP


Thanks for flagging this, based on the current sink design, I think it should 
be able to support both batch and streaming mode without requiring different 
ways of handling or batching the records in the AsyncSinkWriter, but please let 
me know if there’s gap in my understanding here.



> I am not in favor of doing this as this will complicate the schema validation 
> on table creation, maybe we can use the whole schema as dimensions excluding 
> the values and the count, let me know your thoughts here.


I agree it complicates the schema, and yes I am open to explore options to 
simplify it.

My thoughts are if we want to exclude the values and counts from schema, then 
we need another way to provide information to the Sink so that it knows which 
column in the RowData corresponds to the values and counts that should be in 
the CW MetricDatum. And I believe the challenge here is we can’t expose 
ElementConvertor for TableAPI/SQL use cases here. Therefore, alternatives I can 
think of are:


* Option 1: Make “values” and “counts” as reserved keywords to be used in the 
conversion from RowData to MetricDatum. Downside of this is user may want to 
name their column differently based on their use case/context or may want to 
reserve these keywords for something else.
* Option 2: Introduce a set of prefix to identify them, i.e “VALUES_”, 
“COUNTS_”. This approach is not worth it for me as it merely shifts the 
complexity from table option to table column naming.


Also, note that values and counts are optional here, so user can choose to not 
define them in table options if for example they are using StatisticSet instead 
of Values and Counts. Making these options optional also help simplify the 
usage of schema.



> So we are making the metric part of the row data? have we considered not 
> doing that instead and having 1 table map to 1 metric instead of namespace? 
> It might be more suitable to enforce some validations on the dimensions 
> schema this way. Ofc this will probably have is introduce some intermediate 
> class in the model to hold the dimensions, values and counts without the 
> metric name and namespace that we will extract from the sink definition, let 
> me know your thoughts here?


Yes, the proposal is to make metricName part of row data, and yes that means 1 
table per namespace. 

The reason for 1 table per metricName is because it is aligned with CW 
PutMetricDataRequest API interface which is mentioned by Hong. Other 
considerations I had was on flexibility, for example 1 table per namespace can 
support/cater for 1 table per metricName use cases but not vice versa. 



> Are we going to allow all numeric types for values?


Yes, cloudwatch requires values to be Double, so I think we can support all 
numeric type here: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE.



> protected void submitRequestEntries(List<MetricDatum> requestEntries, 
> Consumer<List<MetricDatum>> requestResult) nit: This method should be 
> deprecated after 1.20. I hope the repo is upgraded by the time we implement 
> this


Thanks for flagging this, I have updated the example to be compatible with 2.0.



> Away from poison pills, what error handling are you suggesting? Are we 
> following the footsteps of the other AWS connectors with error 
> classification, is there any effort to abstract it on the AWS side?


Currently I am following the footsteps of other AWS connectors for error 
classification. I did a quick search and did not see JIRA raised to abstract 
it, I will raise it as a separate feature improvement JIRA. However, we are 
going to expose config to allow user to choose their error failover behavior 
i.e fail fast, retry, or drop the batch.



> And on the topic of poison pills, If I understand correctly that is a topic 
> that has been discussed for a while, this ofc breaks the at-least-once 
> semantic and might be confusing to the users, additionally since cloudwatch 
> API fails the full batch how are you suggesting we identify the poison pills? 
> I am personally in favor of global failures in this case but would love to 
> hear the feedback here.


This is a good point, and after gathering feedback from others, the current way 
forward is to expose a config for user to decide and control the error handling 
behavior:

Option 1: Fail the job.
Option 2: Keep retrying the batch.
Option 3: Drop the batch.

Once again, thanks a lot for the feedback and questions

Regards,
Daren

[1] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py


On 2025/04/07 20:23:38 Ahmed Hamdy wrote:
> Hi Daren thanks for the FLIP
> 
> Just a couple of questions and comments?
> 
> > Usable in both DataStream and Table API/SQL
> What about python API? this is sth we should consider ahead since the
> abstract element converter doesn't have a Flink type mapping to be used
> from python, this is a issue we faced with DDB before
> 
> > Therefore, the connector will provide a CloudWatchMetricInput model that
> user can use to pass as input to the connector. For example, in DataStream
> API, it could be a MapFunction called just before passing to the sink as
> follows:
> I am not quite sure I follow, are you suggesting we introduce a
> specific new converter class or relay that to users? also since you
> mentioned FLIP-171, are you suggesting to implement this sink as an
> extension to Async Sink, in that case It is more confusing to me how we are
> going to use the map function with the AsyncSink.ElementConvertor.
> 
> >public class SampleToCloudWatchMetricInputMapper implements MapFunction<
> Sample, CloudWatchMetricInput>
> 
> Is CloudWatchMetricInput a newly introduced model class, I couldn't find it
> in the sdkv2, If we are introducing it then it might be useful to add to
> the FLIP since this is part of the API.
> 
> 
> > Supports both Bounded (Batch) and Unbounded (Streaming)
> 
> What do you propose to handle them differently? I can't find a specific
> thing in the FLIP
> 
> Regarding table API
> 
> > 'metric.dimension.keys' = 'cw_dim',
> 
> I am not in favor of doing this as this will complicate the schema
> validation on table creation, maybe we can use the whole schema as
> dimensions excluding the values and the count, let me know your thoughts
> here.
> 
> > 'metric.name.key' = 'cw_metric_name',
> 
> So we are making the metric part of the row data? have we considered not
> doing that instead and having 1 table map to 1 metric instead of namespace?
> It might be more suitable to enforce some validations on the dimensions
> schema this way. Ofc this will probably have is introduce some intermediate
> class in the model to hold the dimensions, values and counts without the
> metric name and namespace that we will extract from the sink definition,
> let me know your thoughts here?
> 
> 
> >`cw_value` BIGINT,
> Are we going to allow all numeric types for values?
> 
> >    protected void submitRequestEntries(
>           List<MetricDatum> requestEntries,
>           Consumer<List<MetricDatum>> requestResult)
> 
> nit: This method should be deprecated after 1.20. I hope the repo is
> upgraded by the time we implement this
> 
> > Error Handling
> Away from poison pills, what error handling are you suggesting? Are we
> following the footsteps of the other AWS connectors with error
> classification, is there any effort to abstract it on the AWS side?
> 
> And on the topic of poison pills, If I understand correctly that is a topic
> that has been discussed for a while, this ofc breaks the at-least-once
> semantic and might be confusing to the users, additionally since cloud
> watch API fails the full batch how are you suggesting we identify the
> poison pills? I am personally in favor of global failures in this case but
> would love to hear the feedback here.
> 
> 
> 
> Best Regards
> Ahmed Hamdy
> 
> 
> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <da...@amazon.co.uk.invalid>
> wrote:
> 
> > Hi Dev,
> >
> > I would like to start a discussion about FLIP: Amazon CloudWatch Metric
> > Sink Connector
> > https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> >
> > This FLIP is proposing to add support for Amazon CloudWatch Metric sink in
> > flink-connector-aws repo. Looking forward to your feedback, thank you
> >
> > Regards,
> > Daren
> >
> 

Reply via email to