Hi wenlong Thanks for your helpful suggestions, I will prepare the design doc these days to outline the implementation plan for further discussion.
Best, Aitozi wenlong.lwl <wenlong88....@gmail.com> 于2022年1月10日周一 10:07写道: > Hi, Aitozi, > thanks for bringing up the discussion on the state evolution of flink sql. > It would be a great improvement on flink sql. > I think it would be better if you could prepare a document providing more > details about the solution, > it would be a big story and huge change and we need to discuss it > comprehensively. > > Some important points: > 1. how the digest of each column is generated? > 2. how can we generate a serializer with multi versions, should we need the > previous sql when we are compiling a new version? > 3. what is the semantic when there are new added aggregation columns, and > when there are cascading aggregation like? > 4. can the design be extended in the future to support case 1? > > Best, > Wenlong > > > On Sun, 9 Jan 2022 at 17:38, Aitozi <gjying1...@gmail.com> wrote: > > > Hi all: > > When we use Flink SQL to develop job, we encounter a big problem > that, > > the state may become incompatible after changing sql. It mainly caused by > > two case: > > > > 1. The operator number may change and make the state of the operator can > > not mapping to the previous state. > > 2. The format of the state value may change , may be caused by the > > add/remove the column of aggregation operator. > > > > In this discussion, I want to proposal to solve the case two, by > introduce > > the mechanism of column digest for the RowData. > > > > 1. In sql job translate phase, we generate the digest for each column of > > RowData. > > 2. We create a new serializer may be calle MergeableRowDataSerializer > which > > includes the column digests for the RowData. > > 3. We generate a int version number for each serialzier, and add the > > version header in the serialized data. We can reply on > > the version number to choose the suitable serializer to read the old > data. > > 4. We store multi-version serialzier in checkpoint during evolution, so > > that we can support the lazy deserialization, which inspired by the avro > > framework insights. In this way, we can avoid the full transfer of old > data > > during restoring (which may cost much time). > > 5. We can also drop the old version of serializer after the ttl of state. > > > > We have apply this implementation in our internal version at (Ant > > Financial), we are looking forward to give this back to flink repo, and > > looking forward to some suggestion from the community. > > > > Best wishes > > Aitozi > > >