Thanks alot it was really related to different versions.
I have one more question about this solution
the select statement returns list of results
i see that when retrieving data we activate row mapper which handles only
one row at a time
and return PCollection of that row
do i have a way to aggregate those results and return an object with list?
i need to write to kafka an object that holds the list of results

Thanks again
S

On Wed, Nov 10, 2021 at 3:28 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> It is very likely that versions of your Flink client and Flink standalone
> cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed
> since Flink 1.14 so please make sure that your Flink client version is also
> 1.14.
>
> Sigalit Eliazov <e.siga...@gmail.com> 于2021年11月10日周三 上午5:46写道:
>
>> Hello
>>
>> i am creating new pipeline which
>>
>>    1. receives info from kafka (mainly the key)
>>
>>    2. with this key select information from a D
>>
>>    3. writes to kafka the results
>>
>> Flink is running has a standalone cluster
>>
>> I am failing on the pipeline deployment when activating step 2 with the 
>> following error
>>
>> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
>> deserialize JobGraph
>>
>> Caused by: java.lang.IllegalArgumentException: No enum constant 
>> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE
>>
>> the code:
>> PCollection<KV<String, Iterable<String>>> input = 
>> pipeline.apply("readFromKafka",
>>                 KafkaTransform.readStrFromKafka(
>>                         pipelineUtil.getBootstrapServers(), topic))
>>         .apply("window", Window.<KV<String, String>>into(new 
>> GlobalWindows()) // Everything into global window.
>>                 
>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>                 .discardingFiredPanes())
>>         .apply("S", GroupByKey.create());
>> PCollection<MyObject> output = input.apply("read from db", 
>> JdbcIO.<KV<String, Iterable<String>>, AnalyticsResult>readAll()
>>         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
>>                         "org.postgresql.Driver", PipelineUtil.getDbInfo())
>>                 .withUsername("123")
>>                 .withPassword(PipelineUtil.readCredentials()))
>>         .withQuery("select *  from a where id = ? order by insert_timestamp 
>> limit 5")
>>         .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
>> Iterable<String>>>() {
>>             @Override
>>             public void setParameters(KV<String, Iterable<String>> element,
>>                                       PreparedStatement preparedStatement) 
>> throws Exception {
>>                 String nfcId = element.getKey();
>>                 preparedStatement.setString(1, nfcId);
>>             }
>>         })
>>         .withRowMapper(new JdbcIO.RowMapper<MyObject>() {
>>             public AnalyticsResult mapRow(ResultSet resultSet) throws 
>> Exception {
>>                 MyObject obj = new MyObject(
>>                         resultSet.getString("name"),
>>                 );
>>
>>                 return obj;
>>             }
>>         }).withCoder(SerializableCoder.of(AnalyticsResult.class)));
>>
>>
>> any ideas?
>>
>> Thanks a lot
>>
>> S
>>
>>

Reply via email to