No space left on device exception

2020-08-19 Thread Vishwas Siravara
Hi guys, I have a deduplication job that runs on flink 1.7, that has some state which uses FsState backend. My TM heap size is 16 GB. I see the below error while trying to checkpoint a state of size 2GB. There is enough space available in s3, I tried to upload larger files and they were all success

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
; > Piotrek > > czw., 20 sie 2020 o 00:56 Vishwas Siravara > napisał(a): > >> Hi guys, >> I have a deduplication job that runs on flink 1.7, that has some state >> which uses FsState backend. My TM heap size is 16 GB. I see the below error >> while trying t

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
change the staging directory for s3 checkpoints ? Best, Vishwas On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara wrote: > Hi Piotr, > Thank you for your suggestion. I will try that, are the temporary files > created in the directory set in *io.tmp.dirs* in the flink-conf.yaml ? >

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
his to your flink-conf.yaml? > > s3.staging-directory: /usr/mware/flink/tmp > > On 20/08/2020 20:50, Vishwas Siravara wrote: > > Hi Piotr, > I did some analysis and realised that the temp files for s3 > checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a &g

OOM error for heap state backend.

2020-08-21 Thread Vishwas Siravara
Hi guys, I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction:

Re: OOM error for heap state backend.

2020-08-26 Thread Vishwas Siravara
eir state requirements. > All tasks in the same task manager share the JVM heap as the task manager > runs one JVM system process on the machine where it is deployed to. > > Best, > Andrey > > On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara > wrote: > >> Hi Andrey,

Re: Providing hdfs name node IP for streaming file sink

2020-03-03 Thread Vishwas Siravara
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink application. It integrates neatly with flink. Best, Nick. On Mon, Mar 2, 2020 at 7:42 PM Yang Wang wrote: > It may work. However, you need to set your own retry policy(similar as > `ConfiguredFailoverProxyProvider` in hadoop). > A

Re: Non parallel file sources

2020-06-23 Thread Vishwas Siravara
Thanks that makes sense. On Tue, Jun 23, 2020 at 2:13 PM Laurent Exsteens < laurent.exste...@euranova.eu> wrote: > Hi Nick, > > On a project I worked on, we simply made the file accessible on a shared > NFS drive. > Our source was custom, and we forced it to parallelism 1 inside the job, > so the

Streaming from a file

2019-08-01 Thread Vishwas Siravara
Hi guys, Is it possible for flink to stream from a unix file system line by line, when I use readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings. The entire contents of the file comes as a datastream, over which

Passing jvm options to flink

2019-08-07 Thread Vishwas Siravara
Hi , I am running flink on a standalone cluster without any resource manager like yarn or K8s. I am submitting my job using command line "*f**link run ..." . *I have a couple of questions: *1. *How can I pass JVM parameters to this job. I want to pass a parameter for a dylib like this -Djava.libr

How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Vishwas Siravara
Hi guys, I have this entry in flink-conf.yaml file for jvm options. env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir }}/krb5.conf" Is this supposed to be a , separated list ? I get a parse exception when th

Re: How can I pass multiple java options in standalone mode ?

2019-08-14 Thread Vishwas Siravara
nf"" This is really confusing and I cant find any document on how I should pass this option. Thanks, Vishwas On Wed, Aug 14, 2019 at 12:40 PM Vishwas Siravara wrote: > Is it possible for me to pass these arguments along with the job when I do > flink run and then pass the jvm options

Re: How can I pass multiple java options in standalone mode ?

2019-08-14 Thread Vishwas Siravara
manager. Thanks, Vishwas On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic < amastilo...@sightmachine.com> wrote: > It’s a YAML file, so I think you need to do something like > > env.java.opts: -Dconfig.resource=qa.conf > > On Aug 14, 2019, at 11:58 AM, Vishwas Siravara

How can I pass jvm options to flink when started from command line

2019-08-14 Thread Vishwas Siravara
I understand that when I run a flink job from command line it forks a jvm and runs the main method and the flink related code run in the task manager. So when I say "flink run " the main does not run on JobManager hence it does not take env.java.options set in the flink-conf.yaml as this applies to

External classpath

2019-08-14 Thread Vishwas Siravara
Hi guys, I m very close to deploying my application in production so I am trying to externalize some of the config files which has to be available on the classpath when I run my application via flink command line interface. >From the flink doc I can add to class path by -C,--classpath

Re: How can I pass multiple java options in standalone mode ?

2019-08-15 Thread Vishwas Siravara
what do you want to do. > > > Best, > Yang > > Vishwas Siravara 于2019年8月15日周四 上午3:39写道: > >> Thanks a lot, I fixed that, so now this works when I submit my job with >> the flink UI but when I submit it via flink run(command line) it does not >> take this env.jav

Flink job parallelism

2019-08-15 Thread Vishwas Siravara
Hi guys, I have a flink job which I want to run with a parallelism of 2. I run it from command line like : flink run -p 2 -C file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid My cluster has two task managers with only 1 task slot each. Ho

Understanding job flow

2019-08-15 Thread Vishwas Siravara
Hi guys, I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ? Thanks, Vishwas

Re: Understanding job flow

2019-08-16 Thread Vishwas Siravara
different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. Thanks, Vishwas On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson wrote: > @transient or use a static factory. > > In Scala we use a @transient lazy val with an initializer to do this &g

Configuring logback for my flink job

2019-08-19 Thread Vishwas Siravara
Hi, I have a logback for my flink application which is packaged with the application fat jar. However when I submit my job from flink command line tool, I see that logback is set to -Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from the client log. As a result my application

Re: Configuring logback for my flink job

2019-08-20 Thread Vishwas Siravara
ink-dist/src/main/flink-bin/bin/flink#L52 > > Vishwas Siravara 于2019年8月19日周一 下午11:02写道: > >> Hi, >> I have a logback for my flink application which is packaged with the >> application fat jar. However when I submit my job from flink command line >> tool, I see that

Configuring logback

2019-08-20 Thread Vishwas Siravara
Hi guys, I am using logback for my application logs. I have logback.xml as a part of my fat jar that I submit to flink via command line flink run "...". When I run my application from IDE , the appenders are what I have set in my logback but when I run from command line the appender defaults to the

Flink logback

2019-08-21 Thread Vishwas Siravara
Hi all, I modified the logback.xml provided by flink distribution, so now the logback.xml file looks like this : *${log.file} false %d{-MM-dd HH:mm:ss.SSS} [%thread]

Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
Hi peeps, I am externalizing checkpoints in S3 for my flink job and I retain them on cancellation. However when I look into my S3 bucket where the checkpoints are stored there is only 1 checkpoint at any point in time . Is this the default behavior of flink where older checkpoints are deleted when

Re: Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
I am also using exactly once checkpointing mode, I have a kafka source and sink so both support transactions which should allow for exactly once processing. Is this the reason why there is only one checkpoint retained ? Thanks, Vishwas On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara wrote

Use logback instead of log4j

2019-08-23 Thread Vishwas Siravara
Hi , >From the flink doc , in order to use logback instead of log4j " Users willing to use logback instead of log4j can just exclude log4j (or delete it from the lib/ folder)." https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html . However when i delete it from the lib a

Re: Externalized checkpoints

2019-08-25 Thread Vishwas Siravara
tml#retained-checkpoints > Best, > Congxian > > > Zhu Zhu 于2019年8月22日周四 上午10:13写道: > >> Hi Vishwas, >> >> You can configure "state.checkpoints.num-retained" to specify the max >> checkpoints to retain. >> By default it is 1. >>

Re: Use logback instead of log4j

2019-08-25 Thread Vishwas Siravara
Any idea on how I can use log back instead ? On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara wrote: > Hi , > From the flink doc , in order to use logback instead of log4j " Users > willing to use logback instead of log4j can just exclude log4j (or delete > it from the lib/

Re: Using shell environment variables

2019-08-25 Thread Vishwas Siravara
You can also link at runtime by providing the path to the dylib by adding -Djava.library.path= in jvm options in the task manager On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu wrote: > Hi Abhishek, > > You need to export the environment variables on all the worker > machines(not the machine to submit

Loading dylibs

2019-08-26 Thread Vishwas Siravara
Hi guys, I have a flink application that loads a dylib like this System.loadLibrary("vibesimplejava"); The application runs fine , when I restart the job I get this exception : com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected errorjava.lang.UnsatisfiedLinkError: Native Libr

Re: Loading dylibs

2019-08-26 Thread Vishwas Siravara
s? Thanks, Vishwas On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke wrote: > I don’t know Dylibs in detail, but can you call a static method where it > checks if it has been already executed and if not then it loads the library > (Singleton pattern)? > > Am 27.08.2019 um 06:39 schrieb

Re: Loading dylibs

2019-08-28 Thread Vishwas Siravara
s a known issue in the Flink Jira project [1]. > Is it possible that you have encountered the same problem? > > [1]: https://issues.apache.org/jira/browse/FLINK-11402 > > Regards, > Aleksey > > > On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara > wrote: > >&

Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hi guys, I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaa

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
artition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, Ka

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
T will be > created based on this keytab. > However, that seems to be working. > Did you check Kafka logs on the broker side ? > Or did you check consumer offsets with Kafka tools in order to validate > consumers are r

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
.security.plain.PlainLoginModule > required username=\"myuser\" password=\"\";" > sasl.mechanism=PLAIN > security.protocol=SASL_SSL > > Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a > écrit : > >> Hey David , >> My consumers are r

understanding task manager logs

2019-09-02 Thread Vishwas Siravara
Hi guys, I am using flink 1.7.2 and my application consumes from a kafka topic and publish to another kafka topic which is in its own kafka environment running a different kafka version,. I am using FlinkKafkaConsumer010 from this dependency *"org.apache.flink" %% "flink-connector-kafka-0.10" % fli

Flink Kafka Connector

2019-09-05 Thread Vishwas Siravara
Hi guys, I am using flink connector for kakfa from 1.9.0 Her is my sbt dependency : "org.apache.flink" %% "flink-connector-kafka" % "1.9.0", When I check the log file I see that the kafka version is 0.10.2.0. According to the docs it says that 1.9.0 onwards the version should be 2.2.0. Why do I

Re: understanding task manager logs

2019-09-06 Thread Vishwas Siravara
in/java/org/apache/kafka/common/utils/AppInfoParser.java#L117 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector > > Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara < > vsirav...@gmail.com>: > >

Using FlinkKafkaConsumer API

2019-09-09 Thread Vishwas Siravara
I am using flink-kafka-connector and this is my dependency "org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*, Whe I look at my dependency tree the kafka client version is -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package. However when I run my code in the clust

externalizing config flies for flink class loader

2019-09-12 Thread Vishwas Siravara
I have a standalone cluster. I have added my own library(jar file) to the lib/ folder in flink . I submit my job from cli after I start the cluster. Now I want to externalize a property file which has to be read by this library. Since this library is loaded by flink's classloader and not the applic

Flink kafka producer partitioning scheme

2019-09-13 Thread Vishwas Siravara
Hi guys, >From the flink doc *By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use a FlinkFixedPartitioner that maps each Flink Kafka Producer parallel subtask to a single Kafka partition (i.e., all records received by a sink subtask will end up i

High availability flink job

2019-09-15 Thread Vishwas Siravara
Hi guys, I have a flink job running in standalone mode with a parallelism of >1 , that produces data to a kafka sink. My topic is replicated with a replication factor of 2. Now suppose one of the kafka brokers goes down , then will my streaming job fail ? Is there a way where in I can continue proc

Increasing number of task slots in the task manager

2019-10-01 Thread Vishwas Siravara
Hi guys, I get java heap space error when I have 1 GB of TM memory and 4 slots(we have 4 cores in our lower environment) per TM , each slot has 1/4GB of managed memory. >From the flink doc https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources I s

Flink restoring a job from a checkpoint

2019-10-08 Thread Vishwas Siravara
Hi guys, I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the l

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Vishwas Siravara
he.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > > > Best > Yun Tang > > > -- > *From:* Vishwas Siravara > *Sent:* Wednesday, October 9, 2019 0:54 > *To:* user > *Subject:* Flink res

Re: Flink restoring a job from a checkpoint

2019-10-09 Thread Vishwas Siravara
ined-checkpoint > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints > > Best, > Congxian > > > Vishwas Siravara 于2019年10月9日周三 上午5:07写道: > >> Hi Yun, >> Thanks for your reply. I do start from GROUP

Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Hi guys, Is it necessary to specify a consumer group name for a kafka streaming job when checkpointing is enabled? Since the offsets are not stored in kafka how does specifying a consumer group help ? Best, Vishwas

Re: Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Please ignore this email. On Wed, Oct 16, 2019 at 1:40 PM Vishwas Siravara wrote: > Hi guys, > Is it necessary to specify a consumer group name for a kafka streaming job > when checkpointing is enabled? Since the offsets are not stored in kafka > how does specifying a consume

Re ordering events with flink

2019-11-01 Thread Vishwas Siravara
Hi guys, I want to know if it's possible to sort events in a flink data stream. I know I can't sort a stream but is there a way in which I can buffer for a very short time and sort those events before sending it to a data sink. In our scenario we consume from a kafka topic which has multiple parti

Partitioning based on key flink kafka sink

2019-11-05 Thread Vishwas Siravara
Hi all, I am using flink 1.7.0 and using this constructor FlinkKafkaProducer(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) >From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this public Fl

flink's hard dependency on zookeeper for HA

2019-11-06 Thread Vishwas Siravara
Hi all, I am using flink 1.7.2 as a standalone cluster in high availability mode with zookeeper. I have noticed that all flink processes go down once zookeeper goes down ? Is this expected behavior since the leader election has already happened and the job has been running for several hours. Best

Unit testing filter function in flink

2019-12-19 Thread Vishwas Siravara
Hi guys, I want to test a function like : private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = { dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { if (value == null || value.get(St

Exactly once semantics for hdfs sink

2020-02-10 Thread Vishwas Siravara
Hi all, I want to use the StreamingFile sink for writing data to hdfs. Can I achieve exactly once semantics with this sink ? Best, HW.

Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Vishwas Siravara
File Sink does support exactly-once semantics and can be > used with HDFS. > > Regards, > Roman > > > On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara > wrote: > >> Hi all, >> I want to use the StreamingFile sink for writing data to hdfs. Can I >> achieve exactly once semantics with this sink ? >> >> >> Best, >> HW. >> >

Unable to set S3 like object storage for state backend.

2019-06-20 Thread Vishwas Siravara
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for s3 filesystem and I have placed it under the lib folder and is available

Re: Unable to set S3 like object storage for state backend.

2019-06-24 Thread Vishwas Siravara
ucket names to contain an underscore. > > I’m guessing that the Hadoop S3 code is trying to treat your path as a valid > URI, but the bucket name doesn’t conform, and thus you get the "null uri > host” issue. > > Could you try with a compliant bucket name? > > — Ken &g

Error checkpointing to S3 like FS (EMC ECS)

2019-06-24 Thread Vishwas Siravara
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not all S3 apis are supported by EMC ESC according to this document. Here is my config s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_n

Providing external files to flink classpath

2019-06-28 Thread Vishwas Siravara
Hi , I am trying to add external property files to the flink classpath for my application. These files are not a part of the fat jar. I put them under the lib folder but flink cant find them? How can I manage external property files that needs to be read by flink ? Thanks, Vishwas

Unable to start task manager in debug mode

2019-07-08 Thread Vishwas Siravara
Hi guys, I am not able to start a stand alone session with one task manager and one job manager on the same node by adding debug option in flink-conf.yaml as env.java.opts: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005( https://cwiki.apache.org/confluence/display/FLINK/Remote+D

Questions about user doc.

2019-07-16 Thread Vishwas Siravara
Hey guys, In this document : https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html , there is a line in the beginning of the scheduling section which says that : "A pipeline consists of multiple successive tasks, such as the *n-th* parallel instance of a MapFunction t

Re: Providing external files to flink classpath

2019-07-17 Thread Vishwas Siravara
1e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120 > > Best > Yun Tang > > -- > *From:* Vishwas Siravara > *Sent:* Saturday, June 29, 2019 0:43 > *To:* user > *Subject:* Providing external files t

Flink s3 wire log

2019-07-18 Thread Vishwas Siravara
Here is my wire log while trying to checkpoint to ecs S3. I see the request got a 404 , does this mean that it can't find the folder *checkpoints . *Since s3 does not have folders, what should I put there ? Thanks so much for all the help that you guys have provided so far. Really appreciate it. 2

S3 checkpointing exception

2019-07-18 Thread Vishwas Siravara
I am using ecs S3 instance to checkpoint, I use the following configuration. s3.access-key vdna_np_user s3.endpoint https://SU73ECSG**COM:9021 s3.secret-key **I set the checkpoint in the code like env.setStateBackend(*new *FsStateBackend("s3://vishwas.test1/checkpoints")) I have a bucke

Re: S3 checkpointing exception

2019-07-20 Thread Vishwas Siravara
I found the solution to this problem , it was a dependency issue, I had to exclude "xml-apis" to get this fixed. Also the s3-presto jar provides better error messages which was helpful. Thanks, Vishwas On Thu, Jul 18, 2019 at 8:14 PM Vishwas Siravara wrote: > I am using ecs S