Hello i am creating new pipeline which
1. receives info from kafka (mainly the key) 2. with this key select information from a D 3. writes to kafka the results Flink is running has a standalone cluster I am failing on the pipeline deployment when activating step 2 with the following error [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to deserialize JobGraph Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE the code: PCollection<KV<String, Iterable<String>>> input = pipeline.apply("readFromKafka", KafkaTransform.readStrFromKafka( pipelineUtil.getBootstrapServers(), topic)) .apply("window", Window.<KV<String, String>>into(new GlobalWindows()) // Everything into global window. .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply("S", GroupByKey.create()); PCollection<MyObject> output = input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, AnalyticsResult>readAll() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.postgresql.Driver", PipelineUtil.getDbInfo()) .withUsername("123") .withPassword(PipelineUtil.readCredentials())) .withQuery("select * from a where id = ? order by insert_timestamp limit 5") .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() { @Override public void setParameters(KV<String, Iterable<String>> element, PreparedStatement preparedStatement) throws Exception { String nfcId = element.getKey(); preparedStatement.setString(1, nfcId); } }) .withRowMapper(new JdbcIO.RowMapper<MyObject>() { public AnalyticsResult mapRow(ResultSet resultSet) throws Exception { MyObject obj = new MyObject( resultSet.getString("name"), ); return obj; } }).withCoder(SerializableCoder.of(AnalyticsResult.class))); any ideas? Thanks a lot S