Hi Qingsheng,



Thanks very much for your detail advice. The first two points are very clear 
and make sense.  I have only one question about the third point.


>Another point to note is that if you check the usage of cache in JDBC and Hive 
>lookup table, the value type is List<RowData>, since it’s common that a 
>joining key could mapped to multiple rows. We could add another layer of 
>abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method 
`appendToKey(List<V>)` can be replaced by put(K, V). What do you think?




 Best regards,
 Yuan

At 2022-04-13 15:20:01, "Qingsheng Ren" <renqs...@gmail.com> wrote:
>Hi Yuan,
>
>Sorry for pending such long time on this thread. I think adding unified 
>abstraction and metrics for cache is quite important for users and developers 
>to optimize and improve their jobs with lookup join. We also have our inner 
>cache abstraction and implementation, so I took a deeper observation and 
>here’re some thoughts of mine. 
>
>1. Metrics
>
>I think users would be interested to these 3 aspects when debugging or 
>benchmarking their jobs: 
>
>(1) Hit / Miss rate
>- hitCount, Counter type, to track number of cache hit
>- missCount, Counter type, to track number of cache miss
>Here we just report the raw count instead of rate to external metric system, 
>since it’s easier and more flexible to make aggregations and calculate rate in 
>metric systems like Prometheus.
>
>(2) Loading throughput and latency
>- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded 
>by cache
>- numRecordsLoadedTotal, Counter type, to track number of records totally 
>loaded
>These two can be used for tracking the throughput of loading
>
>- latestLoadTime, Gauge type, to track the time spent for the latest load 
>operation
>Actually it’s better to use histogram for tracking latency, but it’s quite 
>expensive to manage a histogram. Personally I think a gauge would be good 
>enough to reflect the latency.
>
>- numLoadFailures, Counter type, to track number of failed loads.
>
>(3) Current usage
>- numCachedRecords, Gauge type, to track number of entries in cache
>- numCachedBytes, Gauge type, to track number of bytes used by cache
>
>Most of the metrics above are similar to your original proposal, and here’s 
>the difference: 
>(1) I still think it’s weird to report identifier and type as metrics. It’s 
>quite handy to get the actual cache type through the code path, nevertheless 
>some metric systems don't support string-type metrics (like Prometheus). 
>(2) numRecords is renamed to numCachedRecords
>(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users 
>would be interested to know how many times it loads (missCount), and how many 
>failures (numLoadFailures)
>(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite 
>meaningful for users to see a long run job reporting totalLoadTime with hours 
>even days as value.
>
>2. APIs
>
>(1) CacheMetricGroup: 
>
>public interface CacheMetricGroup {
>    Counter getHitCounter();
>
>    Counter getMissCounter();
>
>    Counter getNumRecordsLoadedTotalCounter();
>
>    Counter getNumBytesLoadedTotalCounter();
>
>    Gauge<Long> getLatestLoadTimeGauge();
>
>    Counter getNumLoadFailureCounter();
>
>    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>
>    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>}
>
>Note that some metrics are provided as getters since they are quite straight 
>forward, except numCacheRecords/Bytes, which should be left for cache 
>implementers. 
>
>(2) Cache
>
>public interface Cache<K, V> extends AutoClosable {
>    void open(CacheMetricGroup cacheMetricGroup);
>
>    V get(K key, Callable<? extends V> loader) throws Exception;
>
>    void put(K key, V value);
>
>    void putAll(Map<? extends K, ? extends V> m);
>
>    void clean();
>
>    long size();
>}
>
>Compared to your proposal: 
>a. `getIdentifier()` is removed. I can’t see any usage of this function, since 
>we are not dynamically loading cache implementations via SPI or factory style.
>b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>c. Extends `AutoClosable` to be symmetric to open, for cleaning resources 
>claimed by cache
>d. `getMetricGroup()` is removed. Metric groups should be exposed to cache 
>implementations instead of users. 
>
>3. Other topics
>Another point to note is that if you check the usage of cache in JDBC and Hive 
>lookup table, the value type is List<RowData>, since it’s common that a 
>joining key could mapped to multiple rows. We could add another layer of 
>abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
>
>And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>
>Cheers, 
>
>Qingsheng
>
>> On Mar 7, 2022, at 16:00, zst...@163.com wrote:
>> 
>> Hi devs,
>> 
>> 
>> I would like to propose a discussion thread about abstraction of Cache 
>> LookupFunction with metrics for cache in connectors to make cache out of box 
>> for connector developers. There are multiple LookupFunction implementations 
>> in individual connectors [1][2][3][4] so far. 
>> At the same time, users can monitor cache in LookupFunction by adding 
>> uniform cache metrics to optimize tasks or troubleshoot.
>> 
>> 
>> I have posted an issue about this, see 
>> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design 
>> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>> 
>> 
>> Looking forward to your feedback, thanks.
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>> [2] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>> [3] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>> [4] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Reply via email to