Hey,
Could You please format Your snippet? It is very hard to understand what is
going on in there.
Best Regards,
Dom.

śr., 16 sty 2019 o 13:05 Ramya Ramamurthy <hair...@gmail.com> napisał(a):

> Hi
>
> I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup.
>
> I can see the Flink Kafka Consumer consuming messages, but these are not
> passed on to the next level, that is the elasticsearch sink. Unable to find
> any logs relevant to this.
>
> Logs about my kafka consumers
>
> 2019-01-16 17:28:05,860 DEBUG
>
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
> - Committing offsets:
> {legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''},
> legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''},
> legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''},
> legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''},
> legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''},
> legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''},
> legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''},
> legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''},
> legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''},
> legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''},
> legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''},
> legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''},
> legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''},
> legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''},
> legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''},
> legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''},
> legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''},
> legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}}
>
> I am unable to see any logs regarding passing this data to the next level,
> which is the Table Query, followed by sink.
>
> Can anyone help me out with why this might happen, or am i missing
> something ??
>
> Here is the snippet of my code:
> StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>                 tableEnv.connect(new Kafka()
>                                 .version("0.11")
>                                 .topic(params.getRequired("read-topic"))
>                                 .property("auto.offset.reset", "latest")
>                 .property("group.id", params.getRequired("group.id"))
>                                 .property("bootstrap.servers",
> params.getRequired("bootstrap.servers")))
>                                 .withSchema(new Schema()
>                         .field("sid", Types.STRING())
>                         .field("_zpsbd6", Types.STRING())
>                         .field("r1", Types.STRING())
>                         .field("r2", Types.STRING())
>                         .field("r5", Types.STRING())
>                         .field("r10", Types.STRING())
>                         .field("isBot", Types.BOOLEAN())
>                         .field("botcode", Types.STRING())
>                         .field("ts", Types.SQL_TIMESTAMP())
>                         /*.field("sensor", Types.STRING())
>                         .field("temp", Types.LONG())
>                         .field("ts", Types.SQL_TIMESTAMP())*/
>                         .rowtime(new Rowtime()
>                                 .timestampsFromField("_zpsbda")
>                                 .watermarksPeriodicBounded(5000)
>                                                 )
>                                 )
>                                 .withFormat(new Json().deriveSchema())
>                                 .inAppendMode()
>                                 .registerTableSource("sourceTopic");
> Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as
> total_hits, " +
>                                 "TUMBLE_START(ts, INTERVAL '1' MINUTE) as
> tumbleStart, " +
>                                 "TUMBLE_END(ts, INTERVAL '1' MINUTE) as
> tumbleEnd FROM sourceTopic " +
>                                 "WHERE r1='true' or r2='true' or r5='true'
> or r10='true' and isBot='true'" +
>                                 "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),
> sid,  _zpsbd6");
>
>                 TypeInformation<Row> schema =
> query.getSchema().toRowType();
>                 SerializationSchema<Row> serializationSchema = new
> JsonRowSerializationSchema(schema);
>                 DataStream<Row> ds = tableEnv.toAppendStream(query,
> Row.class);
>                 ds.print();
>
>                 List<HttpHost> httpHosts = new ArrayList<>();
>                 httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
> ElasticsearchSink.Builder<Row> esSinkBuilder = new
> ElasticsearchSink.Builder<>(
>                                 httpHosts,
>                                 new ElasticsearchSinkFunction<Row>() {
>
>                                         public IndexRequest
> createIndexRequest(Row row) {
>                                                 byte[] document =
> serializationSchema.serialize(row);
>
>                                                 return new
> IndexRequest("prod", "logs")
>
> .source(document, XContentType.JSON);
>
>                                         }
>
>                                         @Override
>                                         public void process(Row r,
> RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
>                                                 System.out.println(r);
>
> requestIndexer.add(createIndexRequest(r));
>                                         }
>                                 }
>                 );
>
>                 esSinkBuilder.setBulkFlushMaxActions(1);
>
>                 ds.addSink(esSinkBuilder.build());
>
>
> Thanks,
> ~Ramya.
>

Reply via email to