Hi Qingsheng,
Thanks very much for your detail advice. The first two points are very clear and make sense. I have only one question about the third point. >Another point to note is that if you check the usage of cache in JDBC and Hive >lookup table, the value type is List<RowData>, since it’s common that a >joining key could mapped to multiple rows. We could add another layer of >abstraction under Cache interface, for example: > >OneToManyCache<K, V> extends Cache<K, List<V>> IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method `appendToKey(List<V>)` can be replaced by put(K, V). What do you think? Best regards, Yuan At 2022-04-13 15:20:01, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi Yuan, > >Sorry for pending such long time on this thread. I think adding unified >abstraction and metrics for cache is quite important for users and developers >to optimize and improve their jobs with lookup join. We also have our inner >cache abstraction and implementation, so I took a deeper observation and >here’re some thoughts of mine. > >1. Metrics > >I think users would be interested to these 3 aspects when debugging or >benchmarking their jobs: > >(1) Hit / Miss rate >- hitCount, Counter type, to track number of cache hit >- missCount, Counter type, to track number of cache miss >Here we just report the raw count instead of rate to external metric system, >since it’s easier and more flexible to make aggregations and calculate rate in >metric systems like Prometheus. > >(2) Loading throughput and latency >- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded >by cache >- numRecordsLoadedTotal, Counter type, to track number of records totally >loaded >These two can be used for tracking the throughput of loading > >- latestLoadTime, Gauge type, to track the time spent for the latest load >operation >Actually it’s better to use histogram for tracking latency, but it’s quite >expensive to manage a histogram. Personally I think a gauge would be good >enough to reflect the latency. > >- numLoadFailures, Counter type, to track number of failed loads. > >(3) Current usage >- numCachedRecords, Gauge type, to track number of entries in cache >- numCachedBytes, Gauge type, to track number of bytes used by cache > >Most of the metrics above are similar to your original proposal, and here’s >the difference: >(1) I still think it’s weird to report identifier and type as metrics. It’s >quite handy to get the actual cache type through the code path, nevertheless >some metric systems don't support string-type metrics (like Prometheus). >(2) numRecords is renamed to numCachedRecords >(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users >would be interested to know how many times it loads (missCount), and how many >failures (numLoadFailures) >(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite >meaningful for users to see a long run job reporting totalLoadTime with hours >even days as value. > >2. APIs > >(1) CacheMetricGroup: > >public interface CacheMetricGroup { > Counter getHitCounter(); > > Counter getMissCounter(); > > Counter getNumRecordsLoadedTotalCounter(); > > Counter getNumBytesLoadedTotalCounter(); > > Gauge<Long> getLatestLoadTimeGauge(); > > Counter getNumLoadFailureCounter(); > > void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); > > void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge) >} > >Note that some metrics are provided as getters since they are quite straight >forward, except numCacheRecords/Bytes, which should be left for cache >implementers. > >(2) Cache > >public interface Cache<K, V> extends AutoClosable { > void open(CacheMetricGroup cacheMetricGroup); > > V get(K key, Callable<? extends V> loader) throws Exception; > > void put(K key, V value); > > void putAll(Map<? extends K, ? extends V> m); > > void clean(); > > long size(); >} > >Compared to your proposal: >a. `getIdentifier()` is removed. I can’t see any usage of this function, since >we are not dynamically loading cache implementations via SPI or factory style. >b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`. >c. Extends `AutoClosable` to be symmetric to open, for cleaning resources >claimed by cache >d. `getMetricGroup()` is removed. Metric groups should be exposed to cache >implementations instead of users. > >3. Other topics >Another point to note is that if you check the usage of cache in JDBC and Hive >lookup table, the value type is List<RowData>, since it’s common that a >joining key could mapped to multiple rows. We could add another layer of >abstraction under Cache interface, for example: > >OneToManyCache<K, V> extends Cache<K, List<V>> > >And add interfaces like `appendToKey(List<V>)` to it. What do you think? > >Cheers, > >Qingsheng > >> On Mar 7, 2022, at 16:00, zst...@163.com wrote: >> >> Hi devs, >> >> >> I would like to propose a discussion thread about abstraction of Cache >> LookupFunction with metrics for cache in connectors to make cache out of box >> for connector developers. There are multiple LookupFunction implementations >> in individual connectors [1][2][3][4] so far. >> At the same time, users can monitor cache in LookupFunction by adding >> uniform cache metrics to optimize tasks or troubleshoot. >> >> >> I have posted an issue about this, see >> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design >> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>. >> >> >> Looking forward to your feedback, thanks. >> >> >> Best regards, >> Yuan >> >> >> >> >> [1] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java >> [2] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java >> [3] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java >> [4] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java