Could you please help me with this? On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <flinkbyhe...@gmail.com> wrote:
> I tried Flink version 1.14.2 / 1.13.5 > > On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <flinkbyhe...@gmail.com> > wrote: > >> Also, I am using flink-connector-kafka_2.11 >> >> val consumer = new FlinkKafkaConsumer[String]("topic_name", new >> SimpleStringSchema(), properties) >> >> >> val myProducer = new FlinkKafkaProducer[String]( >> "topic_name", // target topic >> new KeyedSerializationSchemaWrapper[String](new >> SimpleStringSchema()), // serialization schema >> getProperties(), // producer config >> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) >> >> >> >> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <flinkbyhe...@gmail.com> >> wrote: >> >>> Hi All, >>> >>> I checked the Java version using the java -version on the terminal and >>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only >>> which is by default. >>> >>> [image: image.png] >>> >>> What do you mean by target jvm? Also, what I am trying to achieve is >>> correct? about the windows? >>> >>> Thanks, >>> Martin >>> >>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <renqs...@gmail.com> wrote: >>> >>>> Hi Martin, >>>> >>>> Can you provide the configuration of your Kafka producer and consumer? >>>> Also it’ll be helpful to have the complete code of your DataStream. >>>> >>>> About the error you mentioned, I doubt that the JDK version you >>>> actually use is probably below 1.8. Can you have a double check on the >>>> environment that your job is running in? >>>> >>>> Cheers, >>>> >>>> Qingsheng Ren >>>> >>>> >>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <flinkbyhe...@gmail.com> >>>> wrote: >>>> > >>>> > Hello Folks! >>>> > >>>> > I have a DataStream which sends data to the consumer but I got the >>>> data once the code completed its execution. I didn't receive the records as >>>> the code was writing it to the topic. I was able to achieve this behavior >>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I >>>> haven't enabled checkpointing as of now. I know checkpointing will also do >>>> the trick. My expectation is Producer should batch the records of 2 >>>> seconds and send it to the consumer and consumer should receive a batch of >>>> 2 seconds. My code goes as below: >>>> > >>>> > Producer Side: >>>> > dataToKafka.assignTimestampsAndWatermarks( >>>> > WatermarkStrategy >>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2))) >>>> > dataToKafka.addSink(myProducer).uid("source") >>>> > >>>> > Consumer Side: >>>> > consumer.assignTimestampsAndWatermarks( >>>> > WatermarkStrategy >>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2))) >>>> > >>>> > Now this gives me an error as below: >>>> > >>>> > Static methods in interface require -target:jvm-1.8 >>>> > .forBoundedOutOfOrderness[String](Duration.ofSeconds(2))) >>>> > >>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281 >>>> > >>>> > Thanks, >>>> > Martin. >>>> > >>>> > >>>> > >>>> >>>>