Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Aaron Levin
Hi Georg, you can try using the circe library for this which has a way to automatically generate JSON decoders for scala case classes. As it was mentioned earlier, Flink does not come packaged with JSON-decoding generators for Scala like spark does. On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler wr

Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
Got it. That was what I always thought but needed to be sure. Thank you for confirming On Thu, Jul 9, 2020 at 9:39 PM Xintong Song wrote: > Hi Vishal, > > If you have streaming jobs only and do not use RocksDB, you can tune the > fraction (taskmanager.memory.managed.fraction) to 0. In this

Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Xintong Song
Hi Vishal, If you have streaming jobs only and do not use RocksDB, you can tune the fraction (taskmanager.memory.managed.fraction) to 0. In this way, no more off-heap managed memory will be reserved for the user code execution. Please be aware that this does not mean the JVM heap will get all of t

Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Hi Gordon, ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes. Could it be that the 80 threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run in separate slots/vCPUs and can be spread across 32 node

MalformedClassName for scala case class

2020-07-09 Thread Georg Heiler
Hi, why can't I register the stream as a table and get a MalformedClassName exception? val serializer = new JSONKeyValueDeserializationSchema(false) val stream = senv.addSource( new FlinkKafkaConsumer( "tweets-raw-json", serializer, properties ).setStartFromEarliest() //

Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Great. Thanks. But would it be possible to automate this i.e. to have this work automatically for the case class / product? Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < taher...@gmail.com>: > The performant way would be to apply a map function over the stream and > then use the Jac

Re: Stateful Functions: Routing to remote functions

2020-07-09 Thread Jan Brusch
Hi Igal, I got around to toying around with your proposed solutions today. Unfortunately I didn't get it to work. However, you asked me to share the code and I prepared an example that provides a minimal reproduction of my use case and the corresponding error. Please find it here: https://g

Re: Asynchronous I/O poor performance

2020-07-09 Thread Arvid Heise
Hi Mark, I already explained that this latency is only occurring because of the shuffle step before async IO (e.g. data is sent over network). If you replace val x : DataStream[String] = someIntegers.map( _ => s"${System.currentTimeMillis()}") with val x : DataStream[String] = someIntegers.shu

Re: Asynchronous I/O poor performance

2020-07-09 Thread Mark Zitnik
Hi Arvid, The http client is not my buttoleneck as I said before I check the async and I have a delay until it enters to asyncinvoke about 80 ms if some can explained me why we have such big delay I have attached a sample code in my previous email can some one explain the delay Thanks On Mon, 6

Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Thanks,Gordon for your reply. I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively

Re: AsyncFunction retries

2020-07-09 Thread Arvid Heise
Hi Gadi, FutureUtils is not a public API, so there are no single guarantees that if the method works now, it would work in any coming Flink version. Rather, I'd first check if you can use httpcomponents client 5.0+, then you could simply use the retry handler [1]. If not, then I'd probably copy t

Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-09 Thread Vijay Balakrishnan
Hi, I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS & SHARD_DISCOVERY_INTERVAL_MILLIS. My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often records are fetched from Kinesis Data Stream(KDS). Code seems to be doing this in ShardConsumer.run()-->getRecords() SHARD_DISCO

Re: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Arvid Heise
Hi Lucas, 1.9.2 [1] support depends on Hive upgrading as well [2] . You could cast a vote on both tickets to accelerate it. Schema registry 5.5.0 depends on Avro 1.9.2 and I'm not sure what the implications of a downgrade are. Of course, you could build the module yourself with 5.5.0, test, and

Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Taher Koitawala
The performant way would be to apply a map function over the stream and then use the Jackson ObjectMapper to convert to scala objects. In flink there is no API like Spark to automatically get all fields. On Thu, Jul 9, 2020, 11:38 PM Georg Heiler wrote: > How can I use it with a scala case class

Fwd: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Lucas Heimberg
Hello, I noticed that even in Flink 1.11. Avro in flink-avro and the Kafka Schema Registry client in flink-avro-confluent-registry are still at version 1.8.2 and 4.1.0, respectively. Avro 1.9.2 brings a lot of improvements and bugfixes, in particular in respect to logical types. The Kafka Schema

Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
How can I use it with a scala case class? If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it? https://github.c

Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Taher Koitawala
You can try the Jackson ObjectMapper library and that will get you from json to object. Regards, Taher Koitawala On Thu, Jul 9, 2020, 9:54 PM Georg Heiler wrote: > Hi, > > I want to map a stream of JSON documents from Kafka to a scala case-class. > How can this be accomplished using the JSONKey

map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Hi, I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required? I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer

flink take single element from stream

2020-07-09 Thread Georg Heiler
How can I explore a stream in Flink interactively? Spark has the concept of take/head to extract the first n elements of a dataframe / table. Is something similar available in Flink for a stream like: val serializer = new JSONKeyValueDeserializationSchema(false) val stream = senv.addSource(

Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
ager.memory.process.size(none)MemorySizeTotal Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also 'taskm

Fwd: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Lucas Heimberg
Hello, I noticed that even in Flink 1.11. Avro in flink-avro and the Kafka Schema Registry client in flink-avro-confluent-registry are still at version 1.8.2 and 4.1.0, respectively. Avro 1.9.2 brings a lot of improvements and bugfixes, in particular in respect to logical types. The Kafka Schema

AsyncFunction retries

2020-07-09 Thread Gadi Katsovich
Hi all, I have a job with the following diagram: source -> Flat Map -> Filter -> Filter -> Filter -> async wait operator -> Process -> sink The async operation sends an HTTP post (using Apache HttpAsyncClient). In case the HTTP post times out or fails, I want to retry a few times. Is using Future

Re: Task recovery?

2020-07-09 Thread John Smith
Hi Robert is my assumption correct? On Fri., Jul. 3, 2020, 12:42 p.m. John Smith, wrote: > Here is one log > > https://www.dropbox.com/s/s8uom5uto708izf/flink-job-001.log?dl=0 > > If I understand correctly on June 23rd it suspended the jobs? So at that > point they would no longer show in th

Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
Hello folks, As established https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration , I set the taskmanager.memory.process.size and taskmanager.memory.task.off-heap.size in my flink-conf.yaml and I see the 2 properties being pulled in. * - Load

Re: A query on Flink metrics in kubernetes

2020-07-09 Thread Chesnay Schepler
From Flink's perspective no metrics are aggregated, nor are metric requests forwarded to some other process. Each TaskExecutor has its own reporter, that each must be scraped to get the full set of metrics. On 09/07/2020 11:39, Manish G wrote: Hi, I have a query regarding prometheus scrapin

Re: Check pointing for simple pipeline

2020-07-09 Thread Dawid Wysakowicz
Hi Prasanna, I'd like to add my two cents here. I would not say using the incremental checkpoint is always the best choice. It might have its downsides when restoring from the checkpoint as it will have to apply all the deltas. Therefore restoring from a non-incremental checkpoint might be faster.

A query on Flink metrics in kubernetes

2020-07-09 Thread Manish G
Hi, I have a query regarding prometheus scraping Flink metrics data with application running in kubernetes cluster. If taskmanager is running on multiple nodes, and prometheus requests for the metrics data, then is that request directed to one of the nodes(based on some strategy, like round-robin

Re: Heterogeneous or Dynamic Stream Processing

2020-07-09 Thread Dawid Wysakowicz
Hi Rob, Maybe I am quite late to the party, but I think it might be worth having a look at the Stateful functions API[1] as well. Especially your latest approach reminds me a bit about the architecture of the Stateful functions. There you can have arbitrary routing to functions. You can also deleg

Re: Heterogeneous or Dynamic Stream Processing

2020-07-09 Thread Arvid Heise
Hi Rob, your approach looks good, but I haven't used iterations in streams yet. If it works for you, it can't be bad anyways. If it is indeed working as expected, I'd recommend checking out broadcasts to distribute the rules [1]. This pattern will allow you to dynamically add new rules via a spec