Hi Xintong, Thanks for your comments on this FLIP. I agree with it that most problems mentioned in your comments do exist for this FLIP, but the FLIP's benefit should still be large enough to cancel out these costs.
1. I agree with it that StreamElement should have the extensibility to include other subtypes, but the most basic and common usage of Flink might still have StreamRecord the only subtype of StreamElement, given that Flink has been behaving this way for so many versions. Unless there is a concrete reason or example proving that another StreamElement subtype would be introduced and used in most Flink jobs, and that class must be StreamElement instead of RuntimeEvent or other message interface, I think the side effect of this FLIP to StreamElement's future extensibility should still be acceptable. 2. Given that methods like RecordWriter#broadcastEvent allows specifying whether the event is a priority event, there should have exist the mechanism supporting transmitting runtime events and stream records without disturbing their order, so it would not affect the measurement result to change LatencyMarker to RuntimeEvent. Even if it is finally found to be not supported yet, we can still add this support in or before this FLIP. 3. I agree with it that we should measure APIs in the long run, and the configuration will not be a problem in future. 4. This FLIP does add some more complexity to the serialization process, but it is not the major concern that affects our final treatment of this FLIP. Maybe we can try to reach an agreement on the benefits of this FLIP first, then we would know whether this FLIP is worth adding the complexities. 5. Non-timestamp scenarios include all batch jobs and a considerable proportion of DataStream and SQL jobs. Given that this FLIP could bring performance improvement to these use cases and would not bring performance regression except a possibly few use cases about latency marker as described in previous mails, even if the benefit might be small when the average size of a record is large, I think FLIP-330 is still worth implementing. Best, Yunfeng On Thu, Aug 10, 2023 at 9:16 PM Xintong Song <tonysong...@gmail.com> wrote: > > Hi Yunfeng, > > Thanks for preparing this FLIP. I'm respectful for the efforts you already > put into the PoC implementation and benchmarks. However, I have to say I'm > quite concerned about this proposal. > > 1. The FLIP is based on the assumption that in non-timestamp scenarios > StreamRecord is the only possible sub-type of StreamElement. This > assumption is true with the current sub-types, but is not by definition, > and can be broken if we want to introduce more sub-types in future. Or put > it differently, introducing this optimization would limit us from > flexibility extending StreamElement in future, because that may invalid > this optimization and thus introduce regressions. > > 2. Changing LatencyMarker into a RuntimeEvent is problematic. The purpose > of LatencyMarker is to measure the end-to-end latency of a record traveling > through the data flow. RuntimeEvents are consumed with higher priority than > StreamRecord in the shuffle layer, which may introduce bias on the > measurement result. > > 3. The proposed configuration option is, TBH, hard for users to understand. > It requires in-depth understanding of the Flink internals. But this is > probably not a big problem, if the plan is to eventually make this feature > default and remove the configuration option. > > 4. It complicates the system by having multiple code paths for > StreamElement (de)serialization, and the logic for deciding which path to > be used. Admittedly, this is not super expensive, but still worth > comparison with the benefit. > > 5. The benefit is limited. > a. It only benefits non-timestamp scenarios > b. For the applicable scenarios, it only saves the (de)serialization / > transmission cost of 1 byte / record. The larger the record size, the less > benefit. > c. As Jark already pointed out, the 20% improvement is achieved with a > simple Boolean type record. WordCount (where the record size is just 1 > string token + 1 integer, typically 20B or so?) shows only 2-3% throughput > improvement. Not even mention the workloads with larger record size or > higher computation load. > > Given the above, I personally don't think the benefit of this proposal is > worth the cost. > > Best, > > Xintong > > > > On Mon, Aug 7, 2023 at 4:24 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> > wrote: > > > Hi Matt, > > > > Thanks for letting me learn about usages of latency markers in > > production scenarios. I have updated the POC code and the FLIP's > > description, and now that the optimization's requirement does not > > include disabling latency-tracking. The main idea is to treat > > LatencyMarker as a RuntimeEvent during network communication, which > > means it will be serialized by EventSerializer instead of > > StreamElementSerializer, so the tags of StreamElements can be removed > > regardless of latency tracking. Please refer to the FLIP and poc code > > for more details. Hope this could resolve your concerns. > > > > Best, > > Yunfeng > > > > On Mon, Jul 17, 2023 at 2:13 PM Matt Wang <wang...@163.com> wrote: > > > > > > Hi Yunfeng, > > > > > > > > > Thank you for testing 1 again, and look forward to the performance > > results of TPC-DS later. > > > > > > We use a latency marker to monitor the end-to-end latency of flink jobs. > > If the latencyTrackingInterval is set too small(like 5ms), it will have a > > large impact on performance. But if the latencyTrackingInterval is > > configured to be relatively large, such as 10s, this impact can be ignored. > > > > > > > > > > > > -- > > > > > > Best, > > > Matt Wang > > > > > > > > > ---- Replied Message ---- > > > | From | Yunfeng Zhou<flink.zhouyunf...@gmail.com> | > > > | Date | 07/14/2023 20:30 | > > > | To | <dev@flink.apache.org> | > > > | Subject | Re: [DISCUSS] FLIP-330: Support specifying record timestamp > > requirement | > > > Hi Matt, > > > > > > 1. I tried to add back the tag serialization process back to my POC > > > code and run the benchmark again, this time the performance > > > improvement is roughly reduced by half. It seems that both the > > > serialization and the judgement process have a major contribution to > > > the overhead reduction in the specific scenario, but in a production > > > environment where distributed cluster is deployed, I believe the > > > reduction in serialization would be a more major reason for > > > performance improvement. > > > > > > 2. According to the latency-tracking section in Flink document[1], it > > > seems that users would only enable latency markers for debugging > > > purposes, instead of using it in production code. Could you please > > > illustrate a bit more about the scenarios that would be limited when > > > latency markers are disabled? > > > > > > 3. I plan to benchmark the performance of this POC against TPC-DS and > > > hope that it could cover the common use cases that you are concerned > > > about. I believe there would still be performance improvement when the > > > size of each StreamRecord increases, though the improvement will not > > > be as obvious as that currently in FLIP. > > > > > > Best regards, > > > Yunfeng > > > > > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking > > > > > > On Thu, Jul 13, 2023 at 5:51 PM Matt Wang <wang...@163.com> wrote: > > > > > > Hi Yunfeng, > > > > > > Thanks for the proposal. The POC showed a performance improvement of > > 20%, which is very exciting. But I have some questions: > > > 1. Is the performance improvement here mainly due to the reduction of > > serialization, or is it due to the judgment consumption caused by tags? > > > 2. Watermark is not needed in some scenarios, but the latency maker is a > > useful function. If the latency maker cannot be used, it will greatly limit > > the usage scenarios. Whether the solution design can retain the capability > > of the latency marker; > > > 3. The data of the POC test is of long type. Here I want to see how much > > profit it will have if it is a string with a length of 100B or 1KB. > > > > > > > > > -- > > > > > > Best, > > > Matt Wang > > > > > > > > > ---- Replied Message ---- > > > | From | Yunfeng Zhou<flink.zhouyunf...@gmail.com> | > > > | Date | 07/13/2023 14:52 | > > > | To | <dev@flink.apache.org> | > > > | Subject | Re: [DISCUSS] FLIP-330: Support specifying record timestamp > > requirement | > > > Hi Jing, > > > > > > Thanks for reviewing this FLIP. > > > > > > 1. I did change the names of some APIs in the FLIP compared with the > > > original version according to which I implemented the POC. As the core > > > optimization logic remains the same and the POC's performance can > > > still reflect the current FLIP's expected improvement, I have not > > > updated the POC code after that. I'll add a note on the benchmark > > > section of the FLIP saying that the namings in the POC code might be > > > outdated, and FLIP is still the source of truth for our proposed > > > design. > > > > > > 2. This FLIP could bring a fixed reduction on the workload of the > > > per-record serialization path in Flink, so if the absolute time cost > > > by non-optimized components could be lower, the performance > > > improvement of this FLIP would be more obvious. That's why I chose to > > > enable object-reuse and to transmit Boolean values in serialization. > > > If it would be more widely regarded as acceptable for a benchmark to > > > adopt more commonly-applied behavior(for object reuse, I believe > > > disable is more common), I would be glad to update the benchmark > > > result to disable object reuse. > > > > > > Best regards, > > > Yunfeng > > > > > > > > > On Thu, Jul 13, 2023 at 6:37 AM Jing Ge <j...@ververica.com.invalid> > > wrote: > > > > > > Hi Yunfeng, > > > > > > Thanks for the proposal. It makes sense to offer the optimization. I got > > > some NIT questions. > > > > > > 1. I guess you changed your thoughts while coding the POC, I found > > > pipeline.enable-operator-timestamp in the code but is > > > pipeline.force-timestamp-support defined in the FLIP > > > 2. about the benchmark example, why did you enable object reuse? Since It > > > is an optimization of serde, will the benchmark be better if it is > > > disabled? > > > > > > Best regards, > > > Jing > > > > > > On Mon, Jul 10, 2023 at 11:54 AM Yunfeng Zhou < > > flink.zhouyunf...@gmail.com> > > > wrote: > > > > > > Hi all, > > > > > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > > > support optimizing StreamRecord's serialization performance. > > > > > > Currently, a StreamRecord would be converted into a 1-byte tag (+ > > > 8-byte timestamp) + N-byte serialized value during the serialization > > > process. In scenarios where timestamps and watermarks are not needed, > > > and latency tracking is enabled, this process would include > > > unnecessary information in the serialized byte array. This FLIP aims > > > to avoid such overhead and increases Flink job's performance during > > > serialization. > > > > > > Please refer to the FLIP document for more details about the proposed > > > design and implementation. We welcome any feedback and opinions on > > > this proposal. > > > > > > Best regards, Dong and Yunfeng > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-330%3A+Support+specifying+record+timestamp+requirement > > > > >