Hi all:
     When we use Flink SQL to develop job, we encounter a big problem that,
the state may become incompatible after changing sql. It mainly caused by
two case:

1. The operator number may change and make the state of the operator can
not mapping to the previous state.
2. The format of the state value may change , may be caused by the
add/remove the column of aggregation operator.

In this discussion, I want to proposal to solve the case two, by introduce
the mechanism of column digest for the RowData.

1. In sql job translate phase, we generate the digest for each column of
RowData.
2. We create a new serializer may be calle MergeableRowDataSerializer which
includes the column digests for the RowData.
3. We generate a int version number for each serialzier, and add the
version header in the serialized data. We can reply on
the version number to choose the suitable serializer to read the old data.
4. We store multi-version serialzier in checkpoint during evolution, so
that we can support the lazy deserialization, which inspired by the avro
framework insights. In this way, we can avoid the full transfer of old data
during restoring (which may cost much time).
5. We can also drop the old version of serializer after the ttl of state.

We have apply this implementation in our internal version at (Ant
Financial), we are looking forward to give this back to flink repo, and
looking forward to some suggestion from the community.

Best wishes
Aitozi

Reply via email to