Hi

I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup.

I can see the Flink Kafka Consumer consuming messages, but these are not
passed on to the next level, that is the elasticsearch sink. Unable to find
any logs relevant to this.

Logs about my kafka consumers

2019-01-16 17:28:05,860 DEBUG
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
- Committing offsets:
{legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''},
legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''},
legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''},
legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''},
legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''},
legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''},
legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''},
legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''},
legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''},
legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''},
legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''},
legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''},
legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''},
legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''},
legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''},
legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''},
legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''},
legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}}

I am unable to see any logs regarding passing this data to the next level,
which is the Table Query, followed by sink.

Can anyone help me out with why this might happen, or am i missing
something ??

Here is the snippet of my code:
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

                tableEnv.connect(new Kafka()
                                .version("0.11")
                                .topic(params.getRequired("read-topic"))
                                .property("auto.offset.reset", "latest")
                .property("group.id", params.getRequired("group.id"))
                                .property("bootstrap.servers",
params.getRequired("bootstrap.servers")))
                                .withSchema(new Schema()
                        .field("sid", Types.STRING())
                        .field("_zpsbd6", Types.STRING())
                        .field("r1", Types.STRING())
                        .field("r2", Types.STRING())
                        .field("r5", Types.STRING())
                        .field("r10", Types.STRING())
                        .field("isBot", Types.BOOLEAN())
                        .field("botcode", Types.STRING())
                        .field("ts", Types.SQL_TIMESTAMP())
                        /*.field("sensor", Types.STRING())
                        .field("temp", Types.LONG())
                        .field("ts", Types.SQL_TIMESTAMP())*/
                        .rowtime(new Rowtime()
                                .timestampsFromField("_zpsbda")
                                .watermarksPeriodicBounded(5000)
                                                )
                                )
                                .withFormat(new Json().deriveSchema())
                                .inAppendMode()
                                .registerTableSource("sourceTopic");
Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as
total_hits, " +
                                "TUMBLE_START(ts, INTERVAL '1' MINUTE) as
tumbleStart, " +
                                "TUMBLE_END(ts, INTERVAL '1' MINUTE) as
tumbleEnd FROM sourceTopic " +
                                "WHERE r1='true' or r2='true' or r5='true'
or r10='true' and isBot='true'" +
                                "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),
sid,  _zpsbd6");

                TypeInformation<Row> schema = query.getSchema().toRowType();
                SerializationSchema<Row> serializationSchema = new
JsonRowSerializationSchema(schema);
                DataStream<Row> ds = tableEnv.toAppendStream(query,
Row.class);
                ds.print();

                List<HttpHost> httpHosts = new ArrayList<>();
                httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<Row> esSinkBuilder = new
ElasticsearchSink.Builder<>(
                                httpHosts,
                                new ElasticsearchSinkFunction<Row>() {

                                        public IndexRequest
createIndexRequest(Row row) {
                                                byte[] document =
serializationSchema.serialize(row);

                                                return new
IndexRequest("prod", "logs")

.source(document, XContentType.JSON);

                                        }

                                        @Override
                                        public void process(Row r,
RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                                                System.out.println(r);

requestIndexer.add(createIndexRequest(r));
                                        }
                                }
                );

                esSinkBuilder.setBulkFlushMaxActions(1);

                ds.addSink(esSinkBuilder.build());


Thanks,
~Ramya.

Reply via email to