我把静态类替换成List<String>也还报同样错误
附件是代码
在 2020-04-30 10:32:26,"shao.hongxiao" <[email protected]> 写道:
>是不是静态内部类的问题?
>Rules for POJO types
>
>Flink recognizes a data type as a POJO type (and allows “by-name” field
>referencing) if the following conditions are fulfilled:
>
>The class is public and standalone (no non-static inner class)
>The class has a public no-argument constructor
>All non-static, non-transient fields in the class (and all superclasses) are
>either public (and non-final) or have a public getter- and a setter- method
>that follows the Java beans naming conventions for getters and setters.
>
>Note that when a user-defined data type can’t be recognized as a POJO type, it
>must be processed as GenericType and serialized with Kryo.
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:[email protected]
>|
>签名由网易邮箱大师定制
>在2020年4月30日 09:40,shx<[email protected]> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:[email protected]
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Serializer。都是默认的序列化。
>在 2020-04-29 18:20:22,"Congxian Qiu" <[email protected]> 写道:
>Hi
>从错误日志看,是 StateMigration 相关的问题。
>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>是一样的或者是兼容的,你可以参考下这个文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>
>Best,
>Congxian
>
>
>guanyq <[email protected]> 于2020年4月29日周三 下午6:09写道:
>
>
>附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.utils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class ReadListState {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
String checkpointDataUri =
"hdfs://audit-dp02:8020/flink/checkpoints/OnlineOrderDispatchDuration/";
String stateDataUri =
"hdfs://audit-dp02:8020/flink/savepoints/OnlineOrderUncompletedDataProcess/20200429/savepoint-a7e717-a512a56609b4";
String uid = "OnlineOrderUncompletedDataProcess_1";
// Reading state
ExistingSavepoint savepoint = Savepoint.load(env, stateDataUri, new
FsStateBackend(checkpointDataUri));
DataSet<List<String>> keyedMapStateDataSet =
savepoint.readKeyedState(uid, new ReaderFunction());
keyedMapStateDataSet.flatMap(new FlatMapFunction<List<String>,
String>() {
@Override
public void flatMap(List<String> lst, Collector<String> collector)
throws Exception {
lst.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
collector.collect(s);
}
});
}
}).print();
env.execute("e");
}
public static class ReaderFunction extends KeyedStateReaderFunction<String,
List<String>> {
private transient MapState<String, Long> dayUnComputeCnt;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, Long> descriptor = new
MapStateDescriptor(
"dayUnComputeCnt",
TypeInformation.of(String.class),
TypeInformation.of(Long.class)
);
dayUnComputeCnt = getRuntimeContext().getMapState(descriptor);
}
@Override
public void readKey(
String key,
Context ctx,
Collector<List<String>> out) throws Exception {
List<String> unCompleteLst = new ArrayList<>();
dayUnComputeCnt.keys().forEach(new Consumer<String>() {
@Override
public void accept(String s) {
unCompleteLst.add(s);
}
});
out.collect(unCompleteLst);
}
}
}