Hi! It is very likely that versions of your Flink client and Flink standalone cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed since Flink 1.14 so please make sure that your Flink client version is also 1.14.
Sigalit Eliazov <e.siga...@gmail.com> 于2021年11月10日周三 上午5:46写道: > Hello > > i am creating new pipeline which > > 1. receives info from kafka (mainly the key) > > 2. with this key select information from a D > > 3. writes to kafka the results > > Flink is running has a standalone cluster > > I am failing on the pipeline deployment when activating step 2 with the > following error > > [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to > deserialize JobGraph > > Caused by: java.lang.IllegalArgumentException: No enum constant > org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE > > the code: > PCollection<KV<String, Iterable<String>>> input = > pipeline.apply("readFromKafka", > KafkaTransform.readStrFromKafka( > pipelineUtil.getBootstrapServers(), topic)) > .apply("window", Window.<KV<String, String>>into(new GlobalWindows()) > // Everything into global window. > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .discardingFiredPanes()) > .apply("S", GroupByKey.create()); > PCollection<MyObject> output = input.apply("read from db", JdbcIO.<KV<String, > Iterable<String>>, AnalyticsResult>readAll() > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( > "org.postgresql.Driver", PipelineUtil.getDbInfo()) > .withUsername("123") > .withPassword(PipelineUtil.readCredentials())) > .withQuery("select * from a where id = ? order by insert_timestamp > limit 5") > .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, > Iterable<String>>>() { > @Override > public void setParameters(KV<String, Iterable<String>> element, > PreparedStatement preparedStatement) > throws Exception { > String nfcId = element.getKey(); > preparedStatement.setString(1, nfcId); > } > }) > .withRowMapper(new JdbcIO.RowMapper<MyObject>() { > public AnalyticsResult mapRow(ResultSet resultSet) throws > Exception { > MyObject obj = new MyObject( > resultSet.getString("name"), > ); > > return obj; > } > }).withCoder(SerializableCoder.of(AnalyticsResult.class))); > > > any ideas? > > Thanks a lot > > S > >