Re: Best approach to aggregate state with idle timeout and periodic output

2025-05-12 Thread Sachin Mittal
Second approach is good to try out. I am also solving for a similar problem using this approach only. Thanks Sachin On Mon, 12 May 2025 at 3:52 PM, Ehud Lev wrote: > Hi Flink community, > > We're building a Flink topology that aggregates events by key. When a key > is seen for the first time,

Re: Inquiry Regarding Flink and Amazon S3 Integration for Externalized Checkpoints

2025-04-24 Thread Sachin Mittal
I am using s3 as checkpoint storage for Flink running as part of EMR (EC2) + YARN setup and also running on EKS. There should not be any problem with it. Thanks Sachin On Thu, Apr 24, 2025 at 12:09 PM Anuj Jain wrote: > Dear Apache Flink Community, > > > > I hope this message finds you well. W

Flink CDC is failing for MongoDB with Resume of change stream was not possible, as the resume point may no longer be in the oplog error

2025-03-25 Thread Sachin Mittal
Hi, We are using Flink CDC as our datastream application deployed on AWS KDA. Our MongoDB is deployed on Mongo Atlas. The versions are: Flink : 1.20.0 MongoCDC (flink-connector-mongodb-cdc) : 3.1.1 After the application is running for few days, I get the following error: java.lang.RuntimeExcept

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Sachin Mittal
] > https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127 > > G > > > On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal wrote: > >>

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Sachin Mittal
a good practice to set uid in the Flink app for each and > every operator, otherwise Flink generates an almost random number for it. > > When you don't know the generated uid then no worry there are nifty tricks > to find it out. > > BR, > G > > > On Fri, Feb 21, 2

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Sachin Mittal
, otherwise Flink generates an almost random number for it. > > When you don't know the generated uid then no worry there are nifty tricks > to find it out. > > BR, > G > > > On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal wrote: > >> Hi, >> I am w

Re: How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
vepoint+data > > > -- > Best! > Xuyang > > > At 2025-02-21 12:11:02, "Sachin Mittal" wrote: > > Hi, > So I have a flink application which stores state (RocksDB state backend) > in S3 with the following directory structure: > > s3://{bucket}/f

How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
Hi, So I have a flink application which stores state (RocksDB state backend) in S3 with the following directory structure: s3://{bucket}/flink-checkpoints//{job-id}|+ --shared/+ --taskowned/+ --chk-/ I have my job pipeline defined like: final DataStream e

Flink future release and upgrade plan

2025-02-13 Thread Sachin Mittal
Hi, We are running Flink 1.18.1 in production and will soon plan to upgrade. So far we have tested our product workloads using Flink 1.19.1 and all looks OK. Looks like we have also released Flink 1.20.0 and now Flink 2.0 is under active development. I had a question, would there be another Flink

Re: How to register pojo type information for third party pojo classes

2025-02-13 Thread Sachin Mittal
mation on > the state side. > > Best, > Zhanghao Chen > -- > *From:* Sachin Mittal > *Sent:* Thursday, February 13, 2025 12:23 > *To:* Zhanghao Chen > *Cc:* user > *Subject:* Re: How to register pojo type informatio

Re: How to register pojo type information for third party pojo classes

2025-02-13 Thread Sachin Mittal
ric types in this case. > > Best, > Zhanghao Chen > -- > *From:* Sachin Mittal > *Sent:* Thursday, February 13, 2025 12:23 > *To:* Zhanghao Chen > *Cc:* user > *Subject:* Re: How to register pojo type information for third

Re: How to register pojo type information for third party pojo classes

2025-02-12 Thread Sachin Mittal
htlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config > > Best, > Zhanghao Chen > -- > *From:* Sachin Mittal > *Sent:* Wednesday, February 12, 2025 20:20 > *To:* user > *Subject:* How to register pojo type inf

How to register pojo type information for third party pojo classes

2025-02-12 Thread Sachin Mittal
Hi, I have a Pojo class provided by some library. Say A.class I can create a type info factory of the same like: public class ATypeInfoFactory extends TypeInfoFactory { @Override public TypeInformation createTypeInfo( Type t, Map> genericParameters) { Map> fields = new HashM

Flink autoscaler is throwing exception and shutting down the entire job

2025-01-06 Thread Sachin Mittal
Hi, We are running our Flink streaming pipeline with following configs: (It runs on Flink 1.19.1 on AWS EMR (Yarn) taskmanager.numberOfTaskSlots: 4 job.autoscaler.enabled: 'true' jobmanager.scheduler: adaptive jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-schedul

For the FileSystem-based Changelog storage, for how long these changelogs are retained

2025-01-06 Thread Sachin Mittal
Hi, I have enabled state.backend.changelog And state.backend.changelog.storage is set to filesystem I wanted to know for how long the changelog files are retained as I was storing the data to HDFS and I was quickly running out of space. What should be the ideal size of disk storage I should alloc

Re: [External] Is there a way to get consolidated metrics at task level

2025-01-06 Thread Sachin Mittal
ne machine and the have al > task managers export their monitoring to this push gateway. > > Then Prometheus would collect all metrics from push gateway. > > On yarn, the main problem is missing isolation of workloads and thus you > easily run into port conflicts … > > > > H

Generic log-based incremental checkpoint seems to be not working

2025-01-06 Thread Sachin Mittal
Hello, I am running a job on apache Flink 1.19, on AWS EMR (EC2) cluster as a YARN application. I have implemented a generic log-based incremental checkpointing for faster checkpoint. It is more described in here: https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing

Is there a way to get consolidated metrics at task level

2025-01-06 Thread Sachin Mittal
Hello, I am running a Flink streaming pipeline with autscaler on AWS EMR (as a YARN application). My tasks are scaling independently based on load. My question is how can I get the metric, say numRecordsInPerSecond, at the task level. Basically it should sum all the values for this metric at sub t

Late data stream is not progressing after re-watermarking

2024-11-28 Thread Sachin Mittal
Hello, I am stuck with a weird problem and not able to wrap my head around it. Here is my pipeline: SingleOutputStreamOperator data = flattenedPlayerStatsData .keyBy(new KeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(300))) .sideOutputLateData(lateOutp

What could be the reason for failure - Key group is not in KeyGroupRange.

2024-11-24 Thread Sachin Mittal
Hi, I am using apache flink 1.18 and running this locally with rocksdb as state backend. So far my pipeline was working fine and was making few adjustments and then it started failing with some weird exception: 2024-11-24 21:54:27,440 WARN org.apache.flink.runtime.taskmanager.Task [] - Task (1/4)#

What would be the best serialization mechanism in flink

2024-11-07 Thread Sachin Mittal
Hi, We are using Flink 1.18 and we do lot of stateful processing in our jobs and persist large states. We are using rocksdb as the state backend and write state to a filesystem or hdfs. For now we are using POJO serialization. I find this simple to setup and easy to use as we would have lots of po

Kafka sink producing record at event timestamp

2024-10-25 Thread Sachin Mittal
Hi, I am having a pipeline where source and sink are two Kafka topics. The pipeline uses event time semantics, where event time is extracted from the record. What I notice is that when producing records at the sink side, it produces them such that the record's time in the kafka topic is the same a

Stopping the flink 1.18 program with savepoint seems to fail with timeout

2024-10-10 Thread Sachin Mittal
Hello, I am running a flink job which I stop it with a savepoint: ./bin/flink stop --savepointPath /tmp/flink-savepoints 0b3b584a298afa372491eff5e3d2160b Suspending job "0b3b584a298afa372491eff5e3d2160b" with a CANONICAL savepoint. However this is what I get in the cli --

Re: Status of ClickHouseSink

2024-10-03 Thread Sachin Mittal
-ru/flink-clickhouse-sink/blob/master/pom.xml#L31C24-L31C29> > version of flink. Did you make the version up yourself? > > чт, 3 окт. 2024 г. в 15:17, Sachin Mittal : > >> I have been using: >> https://github.com/ivi-ru/flink-clickhouse-sink >> >> >> >>

Re: Status of ClickHouseSink

2024-10-03 Thread Sachin Mittal
I have been using: https://github.com/ivi-ru/flink-clickhouse-sink On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov wrote: > Hi, > I've been searching for an implementation of kafka to clickhouse sink and > found FLIP >

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Sachin Mittal
> 2.4.* 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* > 3.0.* 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* > > Best, > Jiabao > > [1] > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions &g

Re: How to set the number formatter for json convertor for mongo cdc connector

2024-08-19 Thread Sachin Mittal
Override > public TypeInformation > getProducedType() { > return Types.GENERIC(Document.class); > } > }) > > --------

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Sachin Mittal
ove snapshot speed > - Source can perform checkpoints in the chunk granularity during snapshot > reading > > Limitation: > - MongoDB version needs to be greater than 4.0 > > Best, > Jiabao > > On 2024/08/19 06:48:39 Sachin Mittal wrote: > > Hi, > > I am usin

How to set the number formatter for json convertor for mongo cdc connector

2024-08-19 Thread Sachin Mittal
Hi, I have configured my connector in following way: MongoDBSource.builder() ... .deserializer(new MongoDeserializationSchema(clazz)) .build(); My class MongoDeserializationSchema is defined like: public class MongoDeserializationSchema implements DebeziumDeserializationSchema { .

When to use scan.incremental.snapshot.enabled

2024-08-18 Thread Sachin Mittal
Hi, I am using mongodb cdc connector version 3.1.1 I am connecting to mongodb atlas, which uses mongodb version 7.0. In the cdc connector I find a property: scan.incremental.snapshot.enabled with default as false. I wanted to know in what cases we should set this as true and what does this prope

Re: Mongo flink CDC connector not reading from the source

2024-08-17 Thread Sachin Mittal
gt; .hosts(HOSTNAME) > .scheme(SCHEME) > .databaseList("test_db") > .collectionList("test_db.test_collection") > ... > > Best, > Jiabao > > On 2024/08/17 12:16:39 Sachin Mittal wrote: > > Hi, > > I have configured a Mong

Mongo flink CDC connector not reading from the source

2024-08-17 Thread Sachin Mittal
Hi, I have configured a MongoDB CDC source as : MongoDBSource.builder() .hosts(HOSTNAME) .scheme(SCHEME) .databaseList(MONGO_DB) .collectionList(collectionName) .username(USERNAME) .password(PASSWORD) .startupOptions(StartupOptions.initial()) .batchSize(2048) .d

Re: Integrating flink CDC with flink

2024-08-16 Thread Sachin Mittal
Sun wrote: > Yes, you can use flink-connector-mongodb-cdc to process both existing and > new data. > > See > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mongodb-cdc/#startup-reading-position > > Best, > Jiabao > > On 202

Re: Integrating flink CDC with flink

2024-08-16 Thread Sachin Mittal
se-3.1/docs/connectors/flink-sources/mongodb-cdc/ > > Best, > Jiabao > > On 2024/08/16 09:46:47 Sachin Mittal wrote: > > Hi, > > I have a scenario where I load a collection from MongoDB inside Flink > using > > flink-connector-mongodb. > > What I additiona

Integrating flink CDC with flink

2024-08-16 Thread Sachin Mittal
Hi, I have a scenario where I load a collection from MongoDB inside Flink using flink-connector-mongodb. What I additionally want is any future changes (insert/updates) to that collection is also streamed inside my Flink Job. What I was thinking of is to use a CDC connector to stream data to my Fl

Re: Can we share states across tasks/operators

2024-08-07 Thread Sachin Mittal
means of multiple side outputs (see here e.g. > > org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction.Context#output) > > > > I do that all the time 😊 > > > > WDYT? > > > > Sincere Flink greetings > > > > Thias > > >

Can we share states across tasks/operators

2024-08-07 Thread Sachin Mittal
Hi, I have a stream which starts from a source and is keyed by a field f. With the stream process function, I can emit the processed record downstream and also update state based on the records it received for the same key. Now I have another stream which starts from another source and is of the s

Re: Is there any way to perform left outer join using flink's data stream APIs

2024-08-06 Thread Sachin Mittal
l look something like: > > ``` > > @Override > > public void coGroup(Iterable arecs, > Iterable brecs, Collector out) { > > // if `brecs` is empty, that means nothing from Stream B matched > Stream A in the window. > > } > > ``` > > > Best, &g

Is there any way to perform left outer join using flink's data stream APIs

2024-08-06 Thread Sachin Mittal
Hi, I have two streams A and B. Which can be joined or connected using a field f. However, for a given record in A for f = f1, there does not exist any record in B matching this field f = f1. In such cases I want to do a left outer join where the combined record pushed downstream would only have

Re: How can I debug Assigned key must not be null error when reading from Mongodb source

2024-08-05 Thread Sachin Mittal
Yes, this fixed the issue. Thanks Sachin On Mon, Aug 5, 2024 at 6:38 PM Jiabao Sun wrote: > Hi Sachin, > > Could you please check if you have used the keyBy operator and ensure that > the keyBy field is not null? > > Best, > Jiabao > > On 2024/08/05 12:33:27 Sachin

Re: How can I debug Assigned key must not be null error when reading from Mongodb source

2024-08-05 Thread Sachin Mittal
value will be emitted > } > } > > ``` > > > > Could you please clarify what methods does the > MongoDeserializationSchema > class overrides, like `deserialize(BsonDocument)` method, or > `deserialize(BsonDocument, Collector)`, too? > > > > Regard

How can I debug Assigned key must not be null error when reading from Mongodb source

2024-08-05 Thread Sachin Mittal
Hi, I am using mongodb connector provided in here: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/ I am instantiating it pretty much in the recommended way: MongoSource source = MongoSource.builder() .setUri("...") .setDatabase("...") .setCollection(

How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Sachin Mittal
Hi, I have a long running yarn cluster and I submit my streaming job using the following command: flink run -m yarn-cluster -yid application_1473169569237_0001 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt --output file:///output/ Let's say I want to stop this job, mak

Re: Task Manager memory usage

2024-05-23 Thread Sachin Mittal
Hi Where are you storing the state. Try rocksdb. Thanks Sachin On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov wrote: > Hi, > > I am trying to understand the following behavior in our Flink application > cluster. Any assistance would be appreciated. > > We are running a Flink application clust

Re: What is the best way to aggregate data over a long window

2024-05-17 Thread Sachin Mittal
multi-level time window granularity for pre-aggregation can significantly > improve performance and reduce computation latency > > Best, > Zhongqiang Gong > > Sachin Mittal 于2024年5月17日周五 03:48写道: > >> Hi, >> My pipeline step is something like this: >>

What is the best way to aggregate data over a long window

2024-05-16 Thread Sachin Mittal
Hi, My pipeline step is something like this: SingleOutputStreamOperator reducedData = data .keyBy(new KeySelector()) .window( TumblingEventTimeWindows.of(Time.seconds(secs))) .reduce(new DataReducer()) .name("reduce"); This works fine for secs = 30

Re: how to reduce read times when many jobs read the same kafka topic?

2024-05-14 Thread Sachin Mittal
manager, reducing the network load. Is my > understanding correct? > > Or are there any other advantages to it? Please advise. Thank you. > > Sachin Mittal 于2024年5月15日周三 09:24写道: > >> We have the same scenario. >> We thought of having one big job with multiple branc

How can we exclude operator level metrics from getting reported

2024-05-11 Thread Sachin Mittal
Hi I have a following metrics configuration: metrics.reporters: stsd metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: '8125' metrics.reporter.stsd.filter.excludes: *.operator:*:* metrics.sc

Re: Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
f this is handled as default case ? > > Maybe side output[1] can help you to collect the late data and re-compute > them. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ > > -- > Best! > Xuyang > > > At 2024-0

Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
Hi, Suppose my pipeline is: data .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .allowedLateness(Time.seconds(180)) .reduce(new MyDataReducer()) So I wanted to know if the final output stream would contain reduced data at the end of the window

Re: Understanding event time wrt watermarking strategy in flink

2024-04-15 Thread Sachin Mittal
if you configure allowed lateness to A, > records with timestamps less than T - A - B will be dropped or > gathered as side outputs. > > Best, > Yunfeng > > On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal wrote: > > > > Hi Yunfeng, > > I have a question around th

Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Sachin Mittal
, T]. > > Best, > Yunfeng > > On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal wrote: > > > > Hello folks, > > I have few questions: > > > > Say I have a source like this: > > > > final DataStream data = > >

Understanding event time wrt watermarking strategy in flink

2024-04-11 Thread Sachin Mittal
Hello folks, I have few questions: Say I have a source like this: final DataStream data = env.fromSource( source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((event, timestamp) -> event.timestamp)); My pipeline after

How are window's boundaries decided in flink

2024-04-10 Thread Sachin Mittal
Hi, Lets say I have defined 1 minute TumblingEventTimeWindows. So it will create windows as: (0, 60), (60, 120), Now lets say I have an event at time t = 60. In which window would this get aggregated ? 1st or second or both. Say I want this to get aggregated only in the second window, how ca

Re: How to debug window step in flink

2024-04-08 Thread Sachin Mittal
link to know when to emit > a window and therefore you won't see any outgoing events. > > Kind Regards > > Dominik > > > > *From: *Sachin Mittal > *Date: *Monday, 8 April 2024 at 08:17 > *To: *user@flink.apache.org > *Subject: *How to debug window step in flink

How to debug window step in flink

2024-04-07 Thread Sachin Mittal
Hi, I have a following windowing step in my pipeline: inputData .keyBy(new MyKeySelector()) .window( TumblingEventTimeWindows.of(Time.seconds(60))) .reduce(new MyReducer()) .name("MyReducer"); Same step when I see in Flink UI shows as: Window(TumblingEventTimeWindows(60

How to handle tuple keys with null values

2024-04-02 Thread Sachin Mittal
Hello folks, I am keying my stream using a Tuple: example: public class MyKeySelector implements KeySelector> { @Override public Tuple2 getKey(Data data) { return Tuple2.of(data.id, data.id1); } } Now id1 can have null values. In this case how should I handle this? Right now I am getting th

Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-22 Thread Sachin Mittal
Hadoop NN runs on 8020 port. You may find the NN IP > details from EMR service. > > Hope this helps. > > -A > > > On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal wrote: > >> Hi, >> We are using AWS EMR where we can submit our flink jobs to a long running >&

Re: Flink unable to read from kafka source due to starting offset strategy

2024-03-22 Thread Sachin Mittal
. Any idea why we the consumer network client is getting disconnected. Is this because this thread is not getting enough resources or something ? Thanks Sachin On Fri, Mar 22, 2024 at 12:48 PM Sachin Mittal wrote: > Hi, > I was experimenting with different starting offset strategies for my

Re: Flink unable to read from kafka source due to starting offset strategy

2024-03-22 Thread Sachin Mittal
Hi, I was experimenting with different starting offset strategies for my Flink job, especially in cases where jobs are canceled and scheduled again and I would like to start with the last committed offset and if the same is not available then start from the latest. So I decided to use this: .setS

Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-21 Thread Sachin Mittal
Hi, We are using AWS EMR where we can submit our flink jobs to a long running flink cluster on Yarn. We wanted to configure RocksDBStateBackend as our state backend to store our checkpoints. So we have configured following properties in our flink-conf.yaml - state.backend.type: rocksdb - s

Re: Need help in understanding PojoSerializer

2024-03-20 Thread Sachin Mittal
done for a list, but it’s not trivial. > > Or as a hack, use a Map and the existing support for map > serialization via > https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/MapSerializer.html > > — Ken > > > On Mar 20,

Need help in understanding PojoSerializer

2024-03-20 Thread Sachin Mittal
Hi, I have a Pojo class like this public class A { public String str; public Set aSet; public Map dMap; } However when I start the flink program I get this message: org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A# dMap will be processed as GenericType. Please read t

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
utTag` to make > sure that this class is packaged correctly. > I think you should check your own jar to make sure this class is not > packaged in your jar. > > Best, > Hang > > Sachin Mittal 于2024年3月12日周二 20:29写道: > >> I miss wrote. It’s version 1.18. >> &g

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) ... 4 more Thanks Sachin On Tue, Mar 12, 2024 at 2:11 PM Sachin Mittal wrote: >

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
n flink-dist. > And Flink version 1.8 is too old. It is better to update your flink > version. > > Best, > Hang > > > > Sachin Mittal 于2024年3月12日周二 16:04写道: > >> Hi, >> We have installed a flink cluster version 1.8.0 on AWS EMR. >> However when we submi

Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Sachin Mittal
ion of Flink? > -- > *From:* Sachin Mittal > *Sent:* Tuesday, March 12, 2024 14:48 > *To:* user@flink.apache.org > *Subject:* Facing ClassNotFoundException: > org.apache.flink.api.common.ExecutionConfig on EMR > > Hi, > We have installed a

Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-11 Thread Sachin Mittal
Hi, We have installed a flink cluster version 1.8.0 on AWS EMR. However when we submit a job we get the following error: (Do note that when we submit the same job on a local instance of Flink 1.8.1 it is working fine. The fat jar we submit has all the flink dependencies from 1.8.0 including the cl