Could you please help me with this?

On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <flinkbyhe...@gmail.com> wrote:

> I tried Flink version 1.14.2 / 1.13.5
>
> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <flinkbyhe...@gmail.com>
> wrote:
>
>> Also, I am using flink-connector-kafka_2.11
>>
>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>> SimpleStringSchema(), properties)
>>
>>
>> val myProducer = new FlinkKafkaProducer[String](
>>       "topic_name", // target topic
>>       new KeyedSerializationSchemaWrapper[String](new
>> SimpleStringSchema()), // serialization schema
>>       getProperties(), // producer config
>>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>
>>
>>
>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <flinkbyhe...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I checked the Java version using the java -version on the terminal and
>>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>>> which is by default.
>>>
>>> [image: image.png]
>>>
>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>> correct? about the windows?
>>>
>>> Thanks,
>>> Martin
>>>
>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <renqs...@gmail.com> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> Can you provide the configuration of your Kafka producer and consumer?
>>>> Also it’ll be helpful to have the complete code of your DataStream.
>>>>
>>>> About the error you mentioned, I doubt that the JDK version you
>>>> actually use is probably below 1.8. Can you have a double check on the
>>>> environment that your job is running in?
>>>>
>>>> Cheers,
>>>>
>>>> Qingsheng Ren
>>>>
>>>>
>>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <flinkbyhe...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hello Folks!
>>>> >
>>>> > I have a DataStream which sends data to the consumer but I got the
>>>> data once the code completed its execution. I didn't receive the records as
>>>> the code was writing it to the topic. I was able to achieve this behavior
>>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>>> the trick.  My expectation is Producer should batch the records of 2
>>>> seconds and send it to the consumer and consumer should receive a batch of
>>>> 2 seconds. My code goes as below:
>>>> >
>>>> > Producer Side:
>>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>>> >       WatermarkStrategy
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >     dataToKafka.addSink(myProducer).uid("source")
>>>> >
>>>> > Consumer Side:
>>>> > consumer.assignTimestampsAndWatermarks(
>>>> >       WatermarkStrategy
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >
>>>> > Now this gives me an error as below:
>>>> >
>>>> > Static methods in interface require -target:jvm-1.8
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >
>>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>>> >
>>>> > Thanks,
>>>> > Martin.
>>>> >
>>>> >
>>>> >
>>>>
>>>>

Reply via email to