Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Juha Mynttinen
Andrey, A small clarification. The tweaked WordCount I posted earlier doesn't illustrate the issue I originally explained, i.e. the one where there's a bigger operator and a smallest possible windows operator. Instead, the modified WordCount illustrates the degraded performance of a very simple Fl

Re: Date Deserialization probleme

2020-06-25 Thread Marco Villalobos
Hello Aisssa, SimpleDateFormat, and java.util.Date are obsolete since JDK 1.8. Also, it can quite dangerous to use a class like SimpleDateFormat in a distributed system because it is not thread-safe. I suggest you use https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.h

Re: Date Deserialization probleme

2020-06-25 Thread Guowei Ma
Hi, Maybe I miss something. But I do not get where the problem is. Could you just clarify which part of your result is not expected? Best, Guowei Aissa Elaffani 于2020年6月26日周五 上午6:58写道: > Hello guys, > I want to deserialize a kafka message in JSON format to a POJO, using > Jackso mapper, and I a

Date Deserialization probleme

2020-06-25 Thread Aissa Elaffani
Hello guys, I want to deserialize a kafka message in JSON format to a POJO, using Jackso mapper, and I am having a problem deserializing the date item in the json message. I am going to show you my date pattern format, and what i got as result! I hope someone can help me ! Please. [image: problemDe

Re: Flink Stream job to parquet sink

2020-06-25 Thread Arvid Heise
Hi Anuj, Yes, broadcast sounds really good. Now you just need to hide the structural invariance (variable number of sinks) by delegating to inner sinks. public class SplittingStreamingFileSink extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { Map si

Re: Native K8S IAM Role?

2020-06-25 Thread Bohinski, Kevin
(via https://github.com/jtblin/kube2iam ) On 2020/06/25 19:08:41, "Bohinski, Kevin" mailto:k...@comcast.com>> wrote: > Hi,> > > > > How do we attach an IAM role to the native K8S sessions?> > > > > Typically for our other pods we use the following in our yamls:> > > spec:> > > template:> > >

Native K8S IAM Role?

2020-06-25 Thread Bohinski, Kevin
Hi, How do we attach an IAM role to the native K8S sessions? Typically for our other pods we use the following in our yamls: spec: template: metadata: annotations: iam.amazonaws.com/role: ROLE_ARN Best kevin

Re: Native K8S not creating TMs

2020-06-25 Thread Bohinski, Kevin
Hi Yang, Thanks for your help, that command worked, so we connected a remote debugger and found the root exception was initially a timeout exception from okhttp. The increases you mentioned worked. Thanks again for all the help! Best, kevin On 2020/06/19 03:46:36, Yang Wang mailto:d...@gmail.

Re: Dynamic partitioner for Flink based on incoming load

2020-06-25 Thread Alexander Filipchik
This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data? Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot? Alex On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas wrote: > Hi Alexander, > > Routing of input dat

Task recovery?

2020-06-25 Thread John Smith
Hi running 1.10.0 3 Zookeepers 3 Job Nodes 3 Task Nodes Yesterday my task nodeas failed with metaspace error. I increased the metaspace a bit to be sure and I restarted the 3 task nodes. But none of the jobs recovered, or no jobs running, should they not recover from the job and zookeeper state?

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Do you also want to answer https://stackoverflow.com/questions/62562153/apache-flink-and-pureconfig-passing-java-properties-on-job-startup ? Your suggestion seems to work well. Best, Georg Am Do., 25. Juni 2020 um 15:32 Uhr schrieb Arvid Heise : > You are welcome. > > I'm not an expert on the y

CEP use case !

2020-06-25 Thread Aissa Elaffani
Hello Guys, I am asking if the CEP Api can resolve my use case. Actually, I have a lot of sensors generating some data, and I want to apply a rules engine on those sensor's data,in order to define a "sensor_status" if it is Normal or Alert or warning.for each record I want to apply some conditions

Heartbeat of TaskManager timed out.

2020-06-25 Thread Ori Popowski
Hello, I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions and I have parallelism of 189. Currently running with RocksDB, with checkpointing disabled. My state size is appx. 500gb. I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no apparent reason. I c

Re: Dynamic partitioner for Flink based on incoming load

2020-06-25 Thread Kostas Kloudas
Hi Alexander, Routing of input data in Flink can be done through keying and this can guarantee collocation constraints. This means that you can send two records to the same node by giving them the same key, e.g. the topic name. Keep in mind that elements with different keys do not necessarily go t

Re: Flink Stream job to parquet sink

2020-06-25 Thread aj
Thanks, Arvide for detailed answers. - Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue). Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I al

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Juha Mynttinen
Hey Yu, 1. Memory and other configuration There's not much configuration going on, it's all in the Java class WordCount. Specifically, memory-related there's this one: rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); I quickly tried that commenting out that line

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Arvid Heise
You are welcome. I'm not an expert on the yarn executor but I hope that -yt,--yarnship Ship files in the specified directory (t for transfer) can help [1]. Oddly this option is not given on the YARN page. But it should be available

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Thanks a lot! Your point is right. One Cluster per job should be used in the thought model to be comparable. In particular for YARN: -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" You mentioned, that the path must be accessible. Spark has a --files parameter and then the

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Arvid Heise
Hi Georg, I think there is a conceptual misunderstanding. If you reuse the cluster for several jobs, they need to share the JVM_ARGS since it's the same process. [1] On Spark, new processes are spawned for each stage afaik. However, the current recommendation is to use only one ad-hoc cluster per

Running Apache Flink on the GraalVM as a Native Image

2020-06-25 Thread ivo.kn...@t-online.de
Whats up guys, I'm trying to run an Apache Flink Application with the GraalVM Native Image but I get the following error: (check attached file) I suppose this happens, because Flink uses a lot of low-level-code and is highly optimized. When I googled the combination of GraalVM Native Image

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Hi, but how can I change/configure it per submitted job and not for the whole cluster? Best, Georg Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise : > Hi Georg, > > thank you for your detailed explanation. You want to use env.java.opts[1]. > There are flavors if you only want to make it

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Yu Li
Thanks for the ping Andrey. Hi Juha, Thanks for reporting the issue. I'd like to check the below things before further digging into it: 1. Could you let us know your configurations (especially memory related ones) when running the tests? 2. Did you watch the memory consumption before / after tu

Re: Flink Stream job to parquet sink

2020-06-25 Thread Rafi Aroch
Hi Arvid, Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder? Thanks, Rafi On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise wrote: > Hi Anuj, > > There i

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Arvid Heise
Hi Georg, thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Andrey Zagrebin
Hi Juha, Thanks for sharing the testing program to expose the problem. This indeed looks suboptimal if X does not leave space for the window operator. I am adding Yu and Yun who might have a better idea about what could be improved about sharing the RocksDB memory among operators. Best, Andrey O