Hey, Could You please format Your snippet? It is very hard to understand what is going on in there. Best Regards, Dom.
śr., 16 sty 2019 o 13:05 Ramya Ramamurthy <hair...@gmail.com> napisał(a): > Hi > > I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup. > > I can see the Flink Kafka Consumer consuming messages, but these are not > passed on to the next level, that is the elasticsearch sink. Unable to find > any logs relevant to this. > > Logs about my kafka consumers > > 2019-01-16 17:28:05,860 DEBUG > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: > {legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''}, > legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''}, > legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''}, > legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''}, > legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''}, > legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''}, > legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''}, > legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''}, > legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''}, > legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''}, > legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''}, > legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''}, > legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''}, > legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''}, > legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''}, > legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''}, > legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''}, > legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}} > > I am unable to see any logs regarding passing this data to the next level, > which is the Table Query, followed by sink. > > Can anyone help me out with why this might happen, or am i missing > something ?? > > Here is the snippet of my code: > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > tableEnv.connect(new Kafka() > .version("0.11") > .topic(params.getRequired("read-topic")) > .property("auto.offset.reset", "latest") > .property("group.id", params.getRequired("group.id")) > .property("bootstrap.servers", > params.getRequired("bootstrap.servers"))) > .withSchema(new Schema() > .field("sid", Types.STRING()) > .field("_zpsbd6", Types.STRING()) > .field("r1", Types.STRING()) > .field("r2", Types.STRING()) > .field("r5", Types.STRING()) > .field("r10", Types.STRING()) > .field("isBot", Types.BOOLEAN()) > .field("botcode", Types.STRING()) > .field("ts", Types.SQL_TIMESTAMP()) > /*.field("sensor", Types.STRING()) > .field("temp", Types.LONG()) > .field("ts", Types.SQL_TIMESTAMP())*/ > .rowtime(new Rowtime() > .timestampsFromField("_zpsbda") > .watermarksPeriodicBounded(5000) > ) > ) > .withFormat(new Json().deriveSchema()) > .inAppendMode() > .registerTableSource("sourceTopic"); > Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as > total_hits, " + > "TUMBLE_START(ts, INTERVAL '1' MINUTE) as > tumbleStart, " + > "TUMBLE_END(ts, INTERVAL '1' MINUTE) as > tumbleEnd FROM sourceTopic " + > "WHERE r1='true' or r2='true' or r5='true' > or r10='true' and isBot='true'" + > "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), > sid, _zpsbd6"); > > TypeInformation<Row> schema = > query.getSchema().toRowType(); > SerializationSchema<Row> serializationSchema = new > JsonRowSerializationSchema(schema); > DataStream<Row> ds = tableEnv.toAppendStream(query, > Row.class); > ds.print(); > > List<HttpHost> httpHosts = new ArrayList<>(); > httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); > ElasticsearchSink.Builder<Row> esSinkBuilder = new > ElasticsearchSink.Builder<>( > httpHosts, > new ElasticsearchSinkFunction<Row>() { > > public IndexRequest > createIndexRequest(Row row) { > byte[] document = > serializationSchema.serialize(row); > > return new > IndexRequest("prod", "logs") > > .source(document, XContentType.JSON); > > } > > @Override > public void process(Row r, > RuntimeContext runtimeContext, RequestIndexer requestIndexer) { > System.out.println(r); > > requestIndexer.add(createIndexRequest(r)); > } > } > ); > > esSinkBuilder.setBulkFlushMaxActions(1); > > ds.addSink(esSinkBuilder.build()); > > > Thanks, > ~Ramya. >