Thanks alot it was really related to different versions. I have one more question about this solution the select statement returns list of results i see that when retrieving data we activate row mapper which handles only one row at a time and return PCollection of that row do i have a way to aggregate those results and return an object with list? i need to write to kafka an object that holds the list of results
Thanks again S On Wed, Nov 10, 2021 at 3:28 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > It is very likely that versions of your Flink client and Flink standalone > cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed > since Flink 1.14 so please make sure that your Flink client version is also > 1.14. > > Sigalit Eliazov <e.siga...@gmail.com> 于2021年11月10日周三 上午5:46写道: > >> Hello >> >> i am creating new pipeline which >> >> 1. receives info from kafka (mainly the key) >> >> 2. with this key select information from a D >> >> 3. writes to kafka the results >> >> Flink is running has a standalone cluster >> >> I am failing on the pipeline deployment when activating step 2 with the >> following error >> >> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to >> deserialize JobGraph >> >> Caused by: java.lang.IllegalArgumentException: No enum constant >> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE >> >> the code: >> PCollection<KV<String, Iterable<String>>> input = >> pipeline.apply("readFromKafka", >> KafkaTransform.readStrFromKafka( >> pipelineUtil.getBootstrapServers(), topic)) >> .apply("window", Window.<KV<String, String>>into(new >> GlobalWindows()) // Everything into global window. >> >> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >> .discardingFiredPanes()) >> .apply("S", GroupByKey.create()); >> PCollection<MyObject> output = input.apply("read from db", >> JdbcIO.<KV<String, Iterable<String>>, AnalyticsResult>readAll() >> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( >> "org.postgresql.Driver", PipelineUtil.getDbInfo()) >> .withUsername("123") >> .withPassword(PipelineUtil.readCredentials())) >> .withQuery("select * from a where id = ? order by insert_timestamp >> limit 5") >> .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, >> Iterable<String>>>() { >> @Override >> public void setParameters(KV<String, Iterable<String>> element, >> PreparedStatement preparedStatement) >> throws Exception { >> String nfcId = element.getKey(); >> preparedStatement.setString(1, nfcId); >> } >> }) >> .withRowMapper(new JdbcIO.RowMapper<MyObject>() { >> public AnalyticsResult mapRow(ResultSet resultSet) throws >> Exception { >> MyObject obj = new MyObject( >> resultSet.getString("name"), >> ); >> >> return obj; >> } >> }).withCoder(SerializableCoder.of(AnalyticsResult.class))); >> >> >> any ideas? >> >> Thanks a lot >> >> S >> >>