Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

2022-04-14 Thread zst...@163.com
Hi Qingsheng,




Thanks very much for your detail advice. The first two points are very clear 
and make sense.  I have only one question about the third point.


>Another point to note is that if you check the usage of cache in JDBC and Hive 
>lookup table, the value type is List, since it’s common that a 
>joining key could mapped to multiple rows. We could add another layer of 
>abstraction under Cache interface, for example: 
>
>OneToManyCache extends Cache>
IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method 
`appendToKey(List)` can be replaced by put(K, V). What do you think?




 Best regards,
 Yuan

At 2022-04-13 15:20:01, "Qingsheng Ren"  wrote:
>Hi Yuan,
>
>Sorry for pending such long time on this thread. I think adding unified 
>abstraction and metrics for cache is quite important for users and developers 
>to optimize and improve their jobs with lookup join. We also have our inner 
>cache abstraction and implementation, so I took a deeper observation and 
>here’re some thoughts of mine. 
>
>1. Metrics
>
>I think users would be interested to these 3 aspects when debugging or 
>benchmarking their jobs: 
>
>(1) Hit / Miss rate
>- hitCount, Counter type, to track number of cache hit
>- missCount, Counter type, to track number of cache miss
>Here we just report the raw count instead of rate to external metric system, 
>since it’s easier and more flexible to make aggregations and calculate rate in 
>metric systems like Prometheus.
>
>(2) Loading throughput and latency
>- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded 
>by cache
>- numRecordsLoadedTotal, Counter type, to track number of records totally 
>loaded
>These two can be used for tracking the throughput of loading
>
>- latestLoadTime, Gauge type, to track the time spent for the latest load 
>operation
>Actually it’s better to use histogram for tracking latency, but it’s quite 
>expensive to manage a histogram. Personally I think a gauge would be good 
>enough to reflect the latency.
>
>- numLoadFailures, Counter type, to track number of failed loads.
>
>(3) Current usage
>- numCachedRecords, Gauge type, to track number of entries in cache
>- numCachedBytes, Gauge type, to track number of bytes used by cache
>
>Most of the metrics above are similar to your original proposal, and here’s 
>the difference: 
>(1) I still think it’s weird to report identifier and type as metrics. It’s 
>quite handy to get the actual cache type through the code path, nevertheless 
>some metric systems don't support string-type metrics (like Prometheus). 
>(2) numRecords is renamed to numCachedRecords
>(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users 
>would be interested to know how many times it loads (missCount), and how many 
>failures (numLoadFailures)
>(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite 
>meaningful for users to see a long run job reporting totalLoadTime with hours 
>even days as value.
>
>2. APIs
>
>(1) CacheMetricGroup: 
>
>public interface CacheMetricGroup {
>Counter getHitCounter();
>
>Counter getMissCounter();
>
>Counter getNumRecordsLoadedTotalCounter();
>
>Counter getNumBytesLoadedTotalCounter();
>
>Gauge getLatestLoadTimeGauge();
>
>Counter getNumLoadFailureCounter();
>
>void setNumCachedRecordsGauge(Gauge numCachedRecordsGauge);
>
>void setNumCachedBytesGauge(Gauge numCachedBytesGauge)
>}
>
>Note that some metrics are provided as getters since they are quite straight 
>forward, except numCacheRecords/Bytes, which should be left for cache 
>implementers. 
>
>(2) Cache
>
>public interface Cache extends AutoClosable {
>void open(CacheMetricGroup cacheMetricGroup);
>
>V get(K key, Callable loader) throws Exception;
>
>void put(K key, V value);
>
>void putAll(Map m);
>
>void clean();
>
>long size();
>}
>
>Compared to your proposal: 
>a. `getIdentifier()` is removed. I can’t see any usage of this function, since 
>we are not dynamically loading cache implementations via SPI or factory style.
>b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>c. Extends `AutoClosable` to be symmetric to open, for cleaning resources 
>claimed by cache
>d. `getMetricGroup()` is removed. Metric groups should be exposed to cache 
>implementations instead of users. 
>
>3. Other topics
>Another point to note is that if you check the usage of cache in JDBC and Hive 
>lookup table, the value type is List, since it’s common that a 
>joining key could mapped to multiple rows. We could add another layer of 
>abstraction under Cache inter

Re:Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-15 Thread zst...@163.com
Hi Qingsheng and devs,




Thanks for your heated discussion and redesign to optmize this feature. I just 
have two comments:

1. How about abtract the LookupCache to a higher level with a common Cache? It 
will be convenient for devs to use in other place.




2. Does it have any metrics, such as NumCachedRecords for the AllCache?

Best regards,
Yuan

At 2022-05-13 20:27:44, "Qingsheng Ren"  wrote:
>Hi Alexander and devs,
>
>Thank you very much for the in-depth discussion! As Jark mentioned we were
>inspired by Alexander's idea and made a refactor on our design. FLIP-221
>[1] has been updated to reflect our design now and we are happy to hear
>more suggestions from you!
>
>Compared to the previous design:
>1. The lookup cache serves at table runtime level and is integrated as a
>component of LookupJoinRunner as discussed previously.
>2. Interfaces are renamed and re-designed to reflect the new design.
>3. We separate the all-caching case individually and introduce a new
>RescanRuntimeProvider to reuse the ability of scanning. We are planning to
>support SourceFunction / InputFormat for now considering the complexity of
>FLIP-27 Source API.
>4. A new interface LookupFunction is introduced to make the semantic of
>lookup more straightforward for developers.
>
>For replying to Alexander:
>> However I'm a little confused whether InputFormat is deprecated or not.
>Am I right that it will be so in the future, but currently it's not?
>Yes you are right. InputFormat is not deprecated for now. I think it will
>be deprecated in the future but we don't have a clear plan for that.
>
>Thanks again for the discussion on this FLIP and looking forward to
>cooperating with you after we finalize the design and interfaces!
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>
>Best regards,
>
>Qingsheng
>
>
>On Fri, May 13, 2022 at 12:12 AM Александр Смирнов 
>wrote:
>
>> Hi Jark, Qingsheng and Leonard!
>>
>> Glad to see that we came to a consensus on almost all points!
>>
>> However I'm a little confused whether InputFormat is deprecated or
>> not. Am I right that it will be so in the future, but currently it's
>> not? Actually I also think that for the first version it's OK to use
>> InputFormat in ALL cache realization, because supporting rescan
>> ability seems like a very distant prospect. But for this decision we
>> need a consensus among all discussion participants.
>>
>> In general, I don't have something to argue with your statements. All
>> of them correspond my ideas. Looking ahead, it would be nice to work
>> on this FLIP cooperatively. I've already done a lot of work on lookup
>> join caching with realization very close to the one we are discussing,
>> and want to share the results of this work. Anyway looking forward for
>> the FLIP update!
>>
>> Best regards,
>> Smirnov Alexander
>>
>> чт, 12 мая 2022 г. в 17:38, Jark Wu :
>> >
>> > Hi Alex,
>> >
>> > Thanks for summarizing your points.
>> >
>> > In the past week, Qingsheng, Leonard, and I have discussed it several
>> times
>> > and we have totally refactored the design.
>> > I'm glad to say we have reached a consensus on many of your points!
>> > Qingsheng is still working on updating the design docs and maybe can be
>> > available in the next few days.
>> > I will share some conclusions from our discussions:
>> >
>> > 1) we have refactored the design towards to "cache in framework" way.
>> >
>> > 2) a "LookupCache" interface for users to customize and a default
>> > implementation with builder for users to easy-use.
>> > This can both make it possible to both have flexibility and conciseness.
>> >
>> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp
>> reducing
>> > IO.
>> > Filter pushdown should be the final state and the unified way to both
>> > support pruning ALL cache and LRU cache,
>> > so I think we should make effort in this direction. If we need to support
>> > filter pushdown for ALL cache anyway, why not use
>> > it for LRU cache as well? Either way, as we decide to implement the cache
>> > in the framework, we have the chance to support
>> > filter on cache anytime. This is an optimization and it doesn't affect
>> the
>> > public API. I think we can create a JIRA issue to
>> > discuss it when the FLIP is accepted.
>> >
>> > 4) The idea to support ALL cache is similar to your proposal.
>> > In the first version, we will only support InputFormat, SourceFunction
>> for
>> > cache all (invoke InputFormat in join operator).
>> > For FLIP-27 source, we need to join a true source operator instead of
>> > calling it embedded in the join operator.
>> > However, this needs another FLIP to support the re-scan ability for
>> FLIP-27
>> > Source, and this can be a large work.
>> > In order to not block this issue, we can put the effort of FLIP-27 source
>> > integration into future work and integrate
>> > InputFormat&SourceFunction for now.
>> >
>> > I think it's fine to use Inp

Re:Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-17 Thread zst...@163.com
Hi Qingsheng, Alexander,



Thanks for your reply.
> Can you give an example of such upper - level Cache usage? It's not clear for 
> me currently. I think it's unnecessary to have such high level abstraction, 
> if nowhere in the code we won't operate with objects as instances of Cache. 
> But maybe there are other opinions on this.

I have't find any other usage yet. Maybe it can be used in DataStream API. If 
you all think it's unnecessary, we can ignore it.




> I think there won't be many problems with supporting metrics in ALL cache. 
> Moreover, some of proposed metrics are most useful especially in ALL case, 
> for example, 'latestLoadTimeGauge' or 'numCachedRecords', so necessary 
> metrics definitely should be supported in this cache strategy.
Sorry for my mistake. There is no problem with it.


Best regards,
Yuan


At 2022-05-17 17:15:20, "Qingsheng Ren"  wrote:
>Hi Yuan,
>
>Thanks for the review! Basically I’m with Alexander opinion. We’d like to 
>limit the scope in lookup scenario so we didn’t extend the cache to a generic 
>one. And as for the metric I think the existing metric definitions are also 
>applicable for all-cache case. 
>
>Cheers, 
>
>Qingsheng
>
>
>> On May 15, 2022, at 21:17, zst...@163.com wrote:
>> 
>> Hi Qingsheng and devs,
>> 
>> Thanks for your heated discussion and redesign to optmize this feature. I 
>> just have two comments:
>> 1. How about abtract the LookupCache to a higher level with a common Cache? 
>> It will be convenient for devs to use in other place.
>> 
>> 2. Does it have any metrics, such as NumCachedRecords for the AllCache?
>> Best regards,
>> Yuan
>> 
>> At 2022-05-13 20:27:44, "Qingsheng Ren"  wrote:
>> >Hi Alexander and devs,
>> >
>> >Thank you very much for the in-depth discussion! As Jark mentioned we were
>> >inspired by Alexander's idea and made a refactor on our design. FLIP-221
>> >[1] has been updated to reflect our design now and we are happy to hear
>> >more suggestions from you!
>> >
>> >Compared to the previous design:
>> >1. The lookup cache serves at table runtime level and is integrated as a
>> >component of LookupJoinRunner as discussed previously.
>> >2. Interfaces are renamed and re-designed to reflect the new design.
>> >3. We separate the all-caching case individually and introduce a new
>> >RescanRuntimeProvider to reuse the ability of scanning. We are planning to
>> >support SourceFunction / InputFormat for now considering the complexity of
>> >FLIP-27 Source API.
>> >4. A new interface LookupFunction is introduced to make the semantic of
>> >lookup more straightforward for developers.
>> >
>> >For replying to Alexander:
>> >> However I'm a little confused whether InputFormat is deprecated or not.
>> >Am I right that it will be so in the future, but currently it's not?
>> >Yes you are right. InputFormat is not deprecated for now. I think it will
>> >be deprecated in the future but we don't have a clear plan for that.
>> >
>> >Thanks again for the discussion on this FLIP and looking forward to
>> >cooperating with you after we finalize the design and interfaces!
>> >
>> >[1]
>> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >
>> >Best regards,
>> >
>> >Qingsheng
>> >
>> >
>> >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов 
>> >wrote:
>> >
>> >> Hi Jark, Qingsheng and Leonard!
>> >>
>> >> Glad to see that we came to a consensus on almost all points!
>> >>
>> >> However I'm a little confused whether InputFormat is deprecated or
>> >> not. Am I right that it will be so in the future, but currently it's
>> >> not? Actually I also think that for the first version it's OK to use
>> >> InputFormat in ALL cache realization, because supporting rescan
>> >> ability seems like a very distant prospect. But for this decision we
>> >> need a consensus among all discussion participants.
>> >>
>> >> In general, I don't have something to argue with your statements. All
>> >> of them correspond my ideas. Looking ahead, it would be nice to work
>> >> on this FLIP cooperatively. I've already done a lot of work on lookup
>> >> join caching with realization very close to the one we are discussing,
>> >>

Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread zst...@163.com
Hi Jing,


Thanks very much for your FLIP. I have some points:


- How shall we deal with CDC data? If there is CDC data in the pipeline, IMHO, 
shuffle by join key will cause CDC data disorder. Will it be better to use 
primary key in this case?


- If the shuffle keys can be customized  when users have the knowledge about 
distribution of data?


- Some connectors such as hive, caches all data in LookupFunction. How to 
decrease the valid cache data size if data can be shuffled?


Best regards,


Yuan
On 12/28/2021 15:11,Jing Zhang wrote:
Hi everyone,
Look up join
[1]
is
commonly used feature in Flink SQL. We have received many optimization
requirements on look up join. For example:
1. Enforces left side of lookup join do a hash partitioner to raise cache
hint ratio
2. Solves the data skew problem after introduces hash lookup join
3. Enables mini-batch optimization to reduce RPC call

Next we will solve these problems one by one. Firstly,  we would focus on
point 1, and continue to discuss point 2 and point 3 later.

There are many similar requirements from user mail list and JIRA about hash
Lookup Join, for example:
1. FLINK-23687  -
Introduce partitioned lookup join to enforce input of LookupJoin to hash
shuffle by lookup keys
2. FLINK-25396  -
lookupjoin source table for pre-partitioning
3. FLINK-25262  -
Support to send data to lookup table for KeyGroupStreamPartitioner way for
SQL.

In this FLIP, I would like to start a discussion about Hash Lookup Join.
The core idea is introducing a 'USE_HASH' hint in query.  This syntax is
directly user-oriented and therefore requires careful design.
There are two ways about how to propagate this hint to LookupJoin in
optimizer. We need further discussion to do final decide. Anyway, the
difference between the two solution is only about the internal
implementation and has no impact on the user.

For more detail on the proposal:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join


Looking forward to your feedback, thanks.

Best,
Jing Zhang

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join


Re:Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-29 Thread zst...@163.com



Hi Jing,
Thanks for your detail reply. 
1) In the last suggestion, hash by primary key is not use for raising the cache 
hit, but handling with skew of left source. Now that you have 'skew' hint and 
other discussion about it, I'm looking forward to it.
2) I mean to support user defined partitioner function. We have a case that 
joining a datalake source with special way of partition, and have implemented 
not elegantly in our internal version. As you said, it needs more design.
3) I thing so-called 'HashPartitionedCache' is usefull, otherwise loading all 
data such as hive lookup table source is almost not available in big data.







Best regards,
Yuan








在 2021-12-29 14:52:11,"Jing Zhang"  写道:
>Hi, Lincoln
>Thanks a lot for the feedback.
>
>>  Regarding the hint name ‘USE_HASH’, could we consider more candidates?
>Things are a little different from RDBMS in the distributed world, and we
>also aim to solve the data skew problem, so all these incoming hints names
>should be considered together.
>
>About skew problem, I would discuss this in next FLIP individually. I would
>like to share hint proposal for skew here.
>I want to introduce 'skew' hint which is a query hint, similar with skew
>hint in spark [1] and MaxCompute[2].
>The 'skew' hint could only contain the name of the table with skew.
>Besides, skew hint could accept table name and column names.
>In addition, skew hint could accept table name, column names and skew
>values.
>For example:
>
>SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id,
>o.total, c.country, c.zip
>FROM Orders AS o
>JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>ON o.customer_id = c.id;
>
>The 'skew' hint is not only used for look up join here, but also could be
>used for other types of join later, for example, batch hash join or
>streaming regular join.
>Go back to better name problem for hash look up join. Since the 'skew' hint
>is a separate hint, so 'use_hash' is still an alternative.
>WDYT?
>I don't have a good idea about the better hint name yet. I would like to
>heard more suggestions about hint names.
>
>>  As you mentioned in the flip, this solution depends on future changes to
>calcite (and also upgrading calcite would be another possible big change:
>at least calicite-1.30 vs 1.26, are we preparing to accept this big
>change?).
>
>Indeed, solution 1 depends on calcite upgrade.
>I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still
>remember what we have suffered from last upgrade to Calcite 1.26.
>However we could not always avoid upgrade for the following reason:
>1. Other features also depends on the Calcite upgrade. For example, Session
>Window and Count Window.
>2. If we always avoid Calcite upgrade, there would be more gap with the
>latest version. One day, if upgrading becomes a thing which has to be done,
>the pain is more.
>
>WDYT?
>
>>  Is there another possible way to minimize the change in calcite?
>
>Do you check the 'Other Alternatives' part in the FLIP-204? It gives
>another solution which does not depend on calcite upgrade and do not need
>to worry about the hint would be missed in the propagation.
>This is also what we have done in the internal version.
>The core idea is propagating 'use_hash' hint to TableScan with matched
>table names.  However, it is a little hacky.
>
>> As I know there're more limitations than `Correlate`.
>
>As mentioned before, in our external version, I choose the the 'Other
>Alternatives' part in the FLIP-204.
>Although I do a POC in the solution 1 and lists all changes I found in the
>FLIP, there may still be something I missed.
>I'm very happy to hear that you point out there're more limitations except
>for `Correlate`, would you please give more details on this part?
>
>Best,
>Jing Zhang
>
>[1] https://docs.databricks.com/delta/join-performance/skew-join.html
>[2]
>https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
>
>Jing Zhang  于2021年12月29日周三 14:40写道:
>
>> Hi Yuan and Lincoln,
>> thanks a lot for the attention. I would answer the email one by one.
>>
>> To Yuan
>> > How shall we deal with CDC data? If there is CDC data in the pipeline,
>> IMHO, shuffle by join key will cause CDC data disorder. Will it be better
>> to use primary key in this case?
>>
>> Good question.
>> The problem could not only exists in CDC data source, but also exists when
>> the input stream is not insert-only stream (for example, the result of
>> unbounded aggregate or regular join).
>> I think use hash by primary key is not a good choise. It could not raise
>> the cache hit because cache key is look up key instead of primary key of
>> input.
>>
>> To avoid wrong result, hash lookup Join requires that the input stream
>> should be insert_only stream or its upsert keys contains lookup keys.
>>
>> I've added this limitation to FLIP, thanks a lot for reminding.
>>
>> > If the shuffle keys can be customized  

Re:Re: [VOTE] FLIP-204: Introduce Hash Lookup Join

2022-01-24 Thread zst...@163.com
+1 non-binging!
Thanks Jing for driving this!
Best,
Yuan




At 2022-01-24 15:32:18, "wenlong.lwl"  wrote:
>+1 non-binding!
>
>Thanks for driving this, Jing!
>
>Best,
>Wenlong
>
>On Mon, 24 Jan 2022 at 16:02, Jingsong Li  wrote:
>
>> +1 (binding)
>>
>> Thanks Jing!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jan 24, 2022 at 3:48 PM Jark Wu  wrote:
>> >
>> > +1 (binding)
>> >
>> > Thanks Jing for driving this!
>> >
>> > Best,
>> > Jark
>> >
>> > On Thu, 20 Jan 2022 at 10:22, Jing Zhang  wrote:
>> >
>> > > Hi community,
>> > >
>> > > I'd like to start a vote on FLIP-204: Introduce Hash Lookup Join [1]
>> which
>> > > has been discussed in the thread [2].
>> > >
>> > > The vote will be open for at least 72 hours unless there is an
>> objection or
>> > > not enough votes.
>> > >
>> > > [1]
>> > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > > [2] https://lists.apache.org/thread/swsg22lshh0ts41w4h9z4b0cgtfyfgxd
>> > >
>> > > Best,
>> > > Jing Zhang
>> > >
>>


[DISCUSS] The abstraction of cache lookupFunction and cache metric

2022-03-07 Thread zst...@163.com
Hi devs,


I would like to propose a discussion thread about abstraction of Cache 
LookupFunction with metrics for cache in connectors to make cache out of box 
for connector developers. There are multiple LookupFunction implementations in 
individual connectors [1][2][3][4] so far. 
At the same time, users can monitor cache in LookupFunction by adding uniform 
cache metrics to optimize tasks or troubleshoot.


I have posted an issue about this, see 
, and made a brief design 
.


Looking forward to your feedback, thanks.


Best regards,
Yuan




[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
[2] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
[3] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
[4] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

2022-03-07 Thread zst...@163.com
Hi Qingsheng Ren,
Thanks for your feedback.


> 1. It looks like “identifier” and “type” are options of cache instead of 
> metrics. I think they are finalized once the cache is created so maybe it’s 
> not quite helpful to report them to the metric system.


Maybe it's not quite appropriate to report them to the metric system, but how 
can we tell the real  “identifier” and “type” options in effect to users?




> 2. Some metrics can be aggregated simply in metric systems, like loadCount = 
> loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental 
> metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.


I agree with you. I have removed these redundant metrics.


> 3. About the interface of CacheMetricGroup I think it would be easier for 
> cache implementers to use if we expose wrapped function instead of let users 
> provide gauges directly. 


Thanks for your advice, they are helpful and I have adjusted it. I have a 
question about it. Does MetricNames.java 
<https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java>
 contain all metric names? Should I put the cache metric names here?






Best regards,
Yuan





At 2022-03-07 16:55:18, "Qingsheng Ren"  wrote:
>Hi Yuan, 
>
>Thanks for raising this discussion! I believe this will be very helpful for 
>lookup table developers, and standardizing metrics would be essential  for 
>users to tuning their systems. 
>
>Here’s some thoughts in my mind:
>
>1. It looks like “identifier” and “type” are options of cache instead of 
>metrics. I think they are finalized once the cache is created so maybe it’s 
>not quite helpful to report them to the metric system.
>
>2. Some metrics can be aggregated simply in metric systems, like loadCount = 
>loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental 
>metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
>
>3. About the interface of CacheMetricGroup I think it would be easier for 
>cache implementers to use if we expose wrapped function instead of let users 
>provide gauges directly. For example:
>
>public interface CacheMetricGroup extends MetricGroup {
>// Mark a cache hit
>public void markCacheHit();
>// Mark a cache miss
>public void recordCacheMiss();
>...
>} 
>
>You can check SourceReaderMetricGroup[1] and its implementation[2] as a 
>reference.
>
>Hope these would be helpful!
>
>Best regards, 
>
>Qingsheng Ren
>
>[1] 
>https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
>[2] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
>
>
>> On Mar 7, 2022, at 16:00, zst...@163.com wrote:
>> 
>> Hi devs,
>> 
>> 
>> I would like to propose a discussion thread about abstraction of Cache 
>> LookupFunction with metrics for cache in connectors to make cache out of box 
>> for connector developers. There are multiple LookupFunction implementations 
>> in individual connectors [1][2][3][4] so far. 
>> At the same time, users can monitor cache in LookupFunction by adding 
>> uniform cache metrics to optimize tasks or troubleshoot.
>> 
>> 
>> I have posted an issue about this, see 
>> <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design 
>> <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>> 
>> 
>> Looking forward to your feedback, thanks.
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>> [2] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>> [3] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>> [4] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

2022-03-07 Thread zst...@163.com
Hi Qingsheng,



> If I understand correctly these are specified in DDL table options by users.




It's inconvenient for user to checkout the options when they in front of a 
running task. And they don't know the real underlying options in effect if 
there are some bugs or other incorrect configurations lead to invalid.




> I don’t think there's a rule that all metric names should be in MetricNames 
> class, but it would be great to aggregate these constants into a unified 
> place. 


It's a good choice to aggregate the constants together.


Best regards,
Yuan


At 2022-03-08 09:57:30, "Qingsheng Ren"  wrote: >Hi Yuan, > 
>> how can we tell the real “identifier” and “type” options in effect to users? 
> >If I understand correctly these are specified in DDL table options by users. 
For example: > >CREATE TABLE DimTable (…) WITH ( > ... > “cache.identifier” = 
“guava”, > “cache.type” = “LRU” >); > >> Does MetricNames.java contain all 
metric names? > > >I don’t think there's a rule that all metric names should be 
in MetricNames class, but it would be great to aggregate these constants into a 
unified place. > >Cheers, > >Qingsheng > > >> On Mar 8, 2022, at 10:22, 
zst...@163.com wrote: >> >> Hi Qingsheng Ren, >> Thanks for your feedback. >> 
>> >>> 1. It looks like “identifier” and “type” are options of cache instead of 
metrics. I think they are finalized once the cache is created so maybe it’s not 
quite helpful to report them to the metric system. >> >> >> Maybe it's not 
quite appropriate to report them to the metric system, but how can we tell the 
real “identifier” and “type” options in effect to users? >> >> >> >> >>> 2. 
Some metrics can be aggregated simply in metric systems, like loadCount = 
loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental 
metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy. >> 
>> >> I agree with you. I have removed these redundant metrics. >> >> >>> 3. 
About the interface of CacheMetricGroup I think it would be easier for cache 
implementers to use if we expose wrapped function instead of let users provide 
gauges directly. >> >> >> Thanks for your advice, they are helpful and I have 
adjusted it. I have a question about it. Does MetricNames.java 
<https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java>
 contain all metric names? Should I put the cache metric names here? >> >> >> 
>> >> >> >> Best regards, >> Yuan >> >> >> >> >> >> At 2022-03-07 16:55:18, 
"Qingsheng Ren"  wrote: >>> Hi Yuan, >>> >>> Thanks for 
raising this discussion! I believe this will be very helpful for lookup table 
developers, and standardizing metrics would be essential for users to tuning 
their systems. >>> >>> Here’s some thoughts in my mind: >>> >>> 1. It looks 
like “identifier” and “type” are options of cache instead of metrics. I think 
they are finalized once the cache is created so maybe it’s not quite helpful to 
report them to the metric system. >>> >>> 2. Some metrics can be aggregated 
simply in metric systems, like loadCount = loadSuccessCount + 
loadExceptionCount, so maybe we can just keep fundamental metrics (like 
loadSuccessCount and loadExceptionCount) to avoid redundancy. >>> >>> 3. About 
the interface of CacheMetricGroup I think it would be easier for cache 
implementers to use if we expose wrapped function instead of let users provide 
gauges directly. For example: >>> >>> public interface CacheMetricGroup extends 
MetricGroup { >>> // Mark a cache hit >>> public void markCacheHit(); >>> // 
Mark a cache miss >>> public void recordCacheMiss(); >>> ... >>> } >>> >>> You 
can check SourceReaderMetricGroup[1] and its implementation[2] as a reference. 
>>> >>> Hope these would be helpful! >>> >>> Best regards, >>> >>> Qingsheng 
Ren >>> >>> [1] 
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
 >>> [2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
 >>> >>> >>>> On Mar 7, 2022, at 16:00, zst...@163.com wrote: >>>> >>>> Hi 
devs, >>>

Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

2022-03-08 Thread zst...@163.com
Hi Qingsheng,
Sorry for my wrong format.

> If I understand correctly these are specified in DDL table options by users.




It's inconvenient for user to checkout the options when they in front of a 
running task. And they don't know the real underlying options in effect if 
there are some bugs or other incorrect configurations lead to invalid.




> I don’t think there's a rule that all metric names should be in MetricNames 
> class, but it would be great to aggregate these constants into a unified 
> place. 


It's a good choice to aggregate the constants together.


Best regards,
Yuan

At 2022-03-08 09:57:30, "Qingsheng Ren"  wrote:
>Hi Yuan, 
>
>> how can we tell the real  “identifier” and “type” options in effect to users?
>
>If I understand correctly these are specified in DDL table options by users. 
>For example: 
>
>CREATE TABLE DimTable (…) WITH (
>...
>“cache.identifier” = “guava”, 
>“cache.type” = “LRU”
>);
>
>> Does MetricNames.java contain all metric names?
>
>
>I don’t think there's a rule that all metric names should be in MetricNames 
>class, but it would be great to aggregate these constants into a unified 
>place. 
>
>Cheers, 
>
>Qingsheng
>
>
>> On Mar 8, 2022, at 10:22, zst...@163.com wrote:
>> 
>> Hi Qingsheng Ren,
>> Thanks for your feedback.
>> 
>> 
>>> 1. It looks like “identifier” and “type” are options of cache instead of 
>>> metrics. I think they are finalized once the cache is created so maybe it’s 
>>> not quite helpful to report them to the metric system.
>> 
>> 
>> Maybe it's not quite appropriate to report them to the metric system, but 
>> how can we tell the real  “identifier” and “type” options in effect to users?
>> 
>> 
>> 
>> 
>>> 2. Some metrics can be aggregated simply in metric systems, like loadCount 
>>> = loadSuccessCount + loadExceptionCount, so maybe we can just keep 
>>> fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid 
>>> redundancy.
>> 
>> 
>> I agree with you. I have removed these redundant metrics.
>> 
>> 
>>> 3. About the interface of CacheMetricGroup I think it would be easier for 
>>> cache implementers to use if we expose wrapped function instead of let 
>>> users provide gauges directly. 
>> 
>> 
>> Thanks for your advice, they are helpful and I have adjusted it. I have a 
>> question about it. Does MetricNames.java 
>> <https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java>
>>  contain all metric names? Should I put the cache metric names here?
>> 
>> 
>> 
>> 
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> 
>> At 2022-03-07 16:55:18, "Qingsheng Ren"  wrote:
>>> Hi Yuan, 
>>> 
>>> Thanks for raising this discussion! I believe this will be very helpful for 
>>> lookup table developers, and standardizing metrics would be essential  for 
>>> users to tuning their systems. 
>>> 
>>> Here’s some thoughts in my mind:
>>> 
>>> 1. It looks like “identifier” and “type” are options of cache instead of 
>>> metrics. I think they are finalized once the cache is created so maybe it’s 
>>> not quite helpful to report them to the metric system.
>>> 
>>> 2. Some metrics can be aggregated simply in metric systems, like loadCount 
>>> = loadSuccessCount + loadExceptionCount, so maybe we can just keep 
>>> fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid 
>>> redundancy.
>>> 
>>> 3. About the interface of CacheMetricGroup I think it would be easier for 
>>> cache implementers to use if we expose wrapped function instead of let 
>>> users provide gauges directly. For example:
>>> 
>>> public interface CacheMetricGroup extends MetricGroup {
>>>   // Mark a cache hit
>>>   public void markCacheHit();
>>>   // Mark a cache miss
>>>   public void recordCacheMiss();
>>>   ...
>>> } 
>>> 
>>> You can check SourceReaderMetricGroup[1] and its implementation[2] as a 
>>> reference.
>>> 
>>> Hope these would be helpful!
>>> 
>>> Best regards, 
>>> 
>>> Qingsheng Ren
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/grou

Re:Re: [VOTE] FLIP-221: Abstraction for lookup source and metric

2022-06-23 Thread zst...@163.com
Thanks Qingsheng for driving this.




+1 (non-binding)




Best regards,

Yuan




At 2022-06-23 15:44:55, "Martijn Visser"  wrote:
>Great work on the FLIP.
>
>+1 (binding)
>
>Op do 23 jun. 2022 om 08:07 schreef Leonard Xu :
>
>> Thanks Qingsheng for driving this work.
>>
>> +1(binding)
>>
>>
>> Best,
>> Leonard
>> > 2022年6月23日 下午1:37,Jark Wu  写道:
>> >
>> > +1 (binding)
>> >
>> > Best,
>> > Jark
>> >
>> > On Thu, 23 Jun 2022 at 12:49, Qingsheng Ren  wrote:
>> >
>> >> Hi devs,
>> >>
>> >> I’d like to start a vote thread for FLIP-221: Abstraction for lookup
>> >> source and metric. You can find the discussion thread in [2]*.
>> >>
>> >> The vote will be open for at least 72 hours unless there is an objection
>> >> or not enough binding votes.
>> >>
>> >> Thanks everyone participating in the discussion!
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric
>> >> [2] https://lists.apache.org/thread/9c0fbgkofkbfdr5hvs62m0cxd2bkgwho
>> >> [*] The link to the discussion thread might not include all emails.
>> Please
>> >> search the Apache email archive with keyword "FLIP-221" to get all
>> >> discussion histories.
>> >>
>> >> Best regards,
>> >> Qingsheng
>>
>>


Re:Re: Re: [ANNOUNCE] New Apache Flink Committers: Qingsheng Ren, Shengkai Fang

2022-06-26 Thread zst...@163.com
Congratulations, Shengkai and Qingsheng!


Best Regards,
Yuan


At 2022-06-24 16:45:25,"godfrey he" , said: 
>Congrats, Shengkai and Qingsheng!
>
>Best,
>Godfrey
>
>Yu Li  于2022年6月22日周三 23:55写道:
>>
>> Congrats and welcome, Qingsheng and Shengkai!
>>
>> Best Regards,
>> Yu
>>
>>
>> On Wed, 22 Jun 2022 at 17:43, Jiangang Liu 
>> wrote:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Jiangang Liu
>> >
>> > Mason Chen  于2022年6月22日周三 00:37写道:
>> >
>> > > Awesome work Qingsheng and Shengkai!
>> > >
>> > > Best,
>> > > Mason
>> > >
>> > > On Tue, Jun 21, 2022 at 4:53 AM Zhipeng Zhang 
>> > > wrote:
>> > >
>> > > > Congratulations, Qingsheng and ShengKai.
>> > > >
>> > > > Yang Wang  于2022年6月21日周二 19:43写道:
>> > > >
>> > > > > Congratulations, Qingsheng and ShengKai.
>> > > > >
>> > > > >
>> > > > > Best,
>> > > > > Yang
>> > > > >
>> > > > > Benchao Li  于2022年6月21日周二 19:33写道:
>> > > > >
>> > > > > > Congratulations!
>> > > > > >
>> > > > > > weijie guo  于2022年6月21日周二 13:44写道:
>> > > > > >
>> > > > > > > Congratulations, Qingsheng and ShengKai!
>> > > > > > >
>> > > > > > > Best regards,
>> > > > > > >
>> > > > > > > Weijie
>> > > > > > >
>> > > > > > >
>> > > > > > > Yuan Mei  于2022年6月21日周二 13:07写道:
>> > > > > > >
>> > > > > > > > Congrats Qingsheng and ShengKai!
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > >
>> > > > > > > > Yuan
>> > > > > > > >
>> > > > > > > > On Tue, Jun 21, 2022 at 11:27 AM Terry Wang <
>> > zjuwa...@gmail.com>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Congratulations, Qingsheng and ShengKai!
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > >
>> > > > > > Best,
>> > > > > > Benchao Li
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > best,
>> > > > Zhipeng
>> > > >
>> > >
>> >