Hi Yuan,

Sorry for pending such long time on this thread. I think adding unified 
abstraction and metrics for cache is quite important for users and developers 
to optimize and improve their jobs with lookup join. We also have our inner 
cache abstraction and implementation, so I took a deeper observation and 
here’re some thoughts of mine. 

1. Metrics

I think users would be interested to these 3 aspects when debugging or 
benchmarking their jobs: 

(1) Hit / Miss rate
- hitCount, Counter type, to track number of cache hit
- missCount, Counter type, to track number of cache miss
Here we just report the raw count instead of rate to external metric system, 
since it’s easier and more flexible to make aggregations and calculate rate in 
metric systems like Prometheus.

(2) Loading throughput and latency
- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by 
cache
- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
These two can be used for tracking the throughput of loading

- latestLoadTime, Gauge type, to track the time spent for the latest load 
operation
Actually it’s better to use histogram for tracking latency, but it’s quite 
expensive to manage a histogram. Personally I think a gauge would be good 
enough to reflect the latency.

- numLoadFailures, Counter type, to track number of failed loads.

(3) Current usage
- numCachedRecords, Gauge type, to track number of entries in cache
- numCachedBytes, Gauge type, to track number of bytes used by cache

Most of the metrics above are similar to your original proposal, and here’s the 
difference: 
(1) I still think it’s weird to report identifier and type as metrics. It’s 
quite handy to get the actual cache type through the code path, nevertheless 
some metric systems don't support string-type metrics (like Prometheus). 
(2) numRecords is renamed to numCachedRecords
(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users 
would be interested to know how many times it loads (missCount), and how many 
failures (numLoadFailures)
(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite 
meaningful for users to see a long run job reporting totalLoadTime with hours 
even days as value.

2. APIs

(1) CacheMetricGroup: 

public interface CacheMetricGroup {
    Counter getHitCounter();

    Counter getMissCounter();

    Counter getNumRecordsLoadedTotalCounter();

    Counter getNumBytesLoadedTotalCounter();

    Gauge<Long> getLatestLoadTimeGauge();

    Counter getNumLoadFailureCounter();

    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
}

Note that some metrics are provided as getters since they are quite straight 
forward, except numCacheRecords/Bytes, which should be left for cache 
implementers. 

(2) Cache

public interface Cache<K, V> extends AutoClosable {
    void open(CacheMetricGroup cacheMetricGroup);

    V get(K key, Callable<? extends V> loader) throws Exception;

    void put(K key, V value);

    void putAll(Map<? extends K, ? extends V> m);

    void clean();

    long size();
}

Compared to your proposal: 
a. `getIdentifier()` is removed. I can’t see any usage of this function, since 
we are not dynamically loading cache implementations via SPI or factory style.
b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
c. Extends `AutoClosable` to be symmetric to open, for cleaning resources 
claimed by cache
d. `getMetricGroup()` is removed. Metric groups should be exposed to cache 
implementations instead of users. 

3. Other topics
Another point to note is that if you check the usage of cache in JDBC and Hive 
lookup table, the value type is List<RowData>, since it’s common that a joining 
key could mapped to multiple rows. We could add another layer of abstraction 
under Cache interface, for example: 

OneToManyCache<K, V> extends Cache<K, List<V>>

And add interfaces like `appendToKey(List<V>)` to it. What do you think?

Cheers, 

Qingsheng

> On Mar 7, 2022, at 16:00, zst...@163.com wrote:
> 
> Hi devs,
> 
> 
> I would like to propose a discussion thread about abstraction of Cache 
> LookupFunction with metrics for cache in connectors to make cache out of box 
> for connector developers. There are multiple LookupFunction implementations 
> in individual connectors [1][2][3][4] so far. 
> At the same time, users can monitor cache in LookupFunction by adding uniform 
> cache metrics to optimize tasks or troubleshoot.
> 
> 
> I have posted an issue about this, see 
> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design 
> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
> 
> 
> Looking forward to your feedback, thanks.
> 
> 
> Best regards,
> Yuan
> 
> 
> 
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
> [2] 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
> [3] 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
> [4] 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Reply via email to