Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Teoh, Hong
Hi, I’d say it seems you are trying to identify bottlenecks in your job, and are currently looking at RocksDB Disk I/O as one of the bottlenecks. However, there are also other bottlenecks (e.g. CPU/memory/network/sink throttling), and from what you described, it’s possible that the HDFS sink is

Re: Decompressing RMQ streaming messages

2022-07-21 Thread Francis Conroy
Hi Venkat, there's nothing that I know of, but I've written a zlib decompressor for our payloads which was pretty straightforward. public class ZlibDeserializationSchema extends AbstractDeserializationSchema { @Override public byte[] deserialize(byte[] message) throws IOException {

Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Jing Ge
Hi, using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This optimization can improve the source throughput and reduce the delayed wrate rate. If this optimization didn't fix the back pressure, could you share more information about your job? Could you check the metric of the back

Re: Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Soumen Choudhury
Hi Yaroslav, Thanks for your reply. How is performance , Can I have a sample code for the same. On Thu, Jul 21, 2022, 9:31 PM Yaroslav Tkachenko wrote: > Hi Soumen, > > I'd try parsing the input using the DataStream API (with a fast JSON > library) and then converting it to a Table. > > On Thu

Re: Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Yaroslav Tkachenko
Hi Soumen, I'd try parsing the input using the DataStream API (with a fast JSON library) and then converting it to a Table. On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury wrote: > We have a requirement of parsing a very complex json (size around 25 kb > per event) event with a predefined sche

Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Yaroslav Tkachenko
Hi! I'd try re-running the SSD test with the following config options: state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED On Thu, Jul 21, 2022 at 4:11 AM vtygoss wrote: > Hi, community! > > > I am doing some performance tests based on my scene. >

Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Soumen Choudhury
We have a requirement of parsing a very complex json (size around 25 kb per event) event with a predefined schema (nested schema, with multiple schema files ) and create a temporary table and from temp table we have to apply some case statement based some fields( eg. to find out success, failure co

Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
This is somewhat implied in https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing. /> Note that Kafka source does //*NOT*//rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of con

Decompressing RMQ streaming messages

2022-07-21 Thread Ramana
Hi - We have a requirement to read the compressed messages emitting out of RabbitMQ and to have them processed using PyFlink. However, I am not finding any out of the box functionality in PyFlink which can help decompress the messages. Could anybody help me with an example of how to go about this?

Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread vtygoss
Hi, community! I am doing some performance tests based on my scene. 1. Environment - Flink: 1.13.5 - StateBackend: RocksDB, incremental - user case: complex sql contains 7 joins and 2 aggregation, input data 30,000,000 records and output 60,000,000 records about 80GB. - resource: flink on ya

Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Alexis Sarda-Espinosa
I would suggest updating the documentation to include that statement. I imagine dynamic partition discovery has no effect on this? Regards, Alexis. Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler < ches...@apache.org>: > Flink only reads the offsets from Kafka when the job is initia

Re: Job id in logs

2022-07-21 Thread Chesnay Schepler
No, that is not possible. There are too man shared components (many of which not being aware of jobs at all) for this to be feasible. On 21/07/2022 10:49, Lior Liviev wrote: Hello, is there a way to add job Id to logs to distinguish between different jobs?

Job id in logs

2022-07-21 Thread Lior Liviev
Hello, is there a way to add job Id to logs to distinguish between different jobs?

Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
Flink only reads the offsets from Kafka when the job is initially started from a clear slate. Once checkpoints are involved it only relies on offsets stored in the state. On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote: Hello again, I just performed a test using OffsetsInitializer.committedO