Hello

i am creating new pipeline which

   1. receives info from kafka (mainly the key)

   2. with this key select information from a D

   3. writes to kafka the results

Flink is running has a standalone cluster

I am failing on the pipeline deployment when activating step 2 with
the following error

[org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to
deserialize JobGraph

Caused by: java.lang.IllegalArgumentException: No enum constant
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE

the code:
PCollection<KV<String, Iterable<String>>> input =
pipeline.apply("readFromKafka",
                KafkaTransform.readStrFromKafka(
                        pipelineUtil.getBootstrapServers(), topic))
        .apply("window", Window.<KV<String, String>>into(new
GlobalWindows()) // Everything into global window.

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())
        .apply("S", GroupByKey.create());
PCollection<MyObject> output = input.apply("read from db",
JdbcIO.<KV<String, Iterable<String>>, AnalyticsResult>readAll()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "org.postgresql.Driver", PipelineUtil.getDbInfo())
                .withUsername("123")
                .withPassword(PipelineUtil.readCredentials()))
        .withQuery("select *  from a where id = ? order by
insert_timestamp limit 5")
        .withParameterSetter(new
JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() {
            @Override
            public void setParameters(KV<String, Iterable<String>> element,
                                      PreparedStatement
preparedStatement) throws Exception {
                String nfcId = element.getKey();
                preparedStatement.setString(1, nfcId);
            }
        })
        .withRowMapper(new JdbcIO.RowMapper<MyObject>() {
            public AnalyticsResult mapRow(ResultSet resultSet) throws
Exception {
                MyObject obj = new MyObject(
                        resultSet.getString("name"),
                );

                return obj;
            }
        }).withCoder(SerializableCoder.of(AnalyticsResult.class)));


any ideas?

Thanks a lot

S

Reply via email to