Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Theo Diefenthal
Awesome, thanks for the release. - Ursprüngliche Mail - Von: "Dawid Wysakowicz" An: "dev" , "user" , annou...@apache.org Gesendet: Mittwoch, 29. September 2021 15:59:47 Betreff: [ANNOUNCE] Apache Flink 1.14.0 released The Apache Flink community is very happy to announce the release of

Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-30 Thread Theo Diefenthal
Hi Mans, Regarding to your first question: I bookmarked the following mailing list discussion a while ago [1]. Fabian Hueske as one of the major contributors to Flink answered that there aren't yet any trigger semantics in Flink SQL, but linked a great idea with a SQL extension of "EMIT". I r

Fastest way for decent lookup JOIN?

2021-05-18 Thread Theo Diefenthal
Hi there, I have the following (probably very common) usecase: I have some lookup data ( 100 million records ) which change only slowly (in the range of some thousands per day). My event stream is in the order of tens of billions events per day and each event needs to be enriched from the 100

Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Theo Diefenthal
Thanks for managing the release. +1. I like the focus on improving operations with this version. Von: "Matthias Pohl" An: "Etienne Chauchot" CC: "dev" , "Dawid Wysakowicz" , "user" , annou...@apache.org Gesendet: Dienstag, 4. Mai 2021 21:53:31 Betreff: Re: [ANNOUNCE] Apache Flink 1.13.0

Flink + Hive + Compaction + Parquet?

2021-03-02 Thread Theo Diefenthal
Hi there, Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS do

Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph

2021-01-13 Thread Theo Diefenthal
Hi there, I'm currently analyzing a weird behavior of one of our jobs running on YARN with Flink 1.11.2. I have a kind of special situation here in that regard that I submit a single streaming job with a disjoint job graph, i.e. that job contains two graphs of the same kind but totally indepen

Re: Logs of JobExecutionListener

2020-12-10 Thread Theo Diefenthal
I would vote for ClusterClient not being internal. I use it a lot in my end-to-end tests to e.g. trigger savepoints and shut down the streaming jobs which I think is not possible via ExecutionEnvironments. So in my opinion, having a more powerful ClusterClient adds a lot of powerful features f

Long blocking call in UserFunction triggers HA leader lost?

2020-11-05 Thread Theo Diefenthal
Hi there, I have a stream where I reload a huge list from time to time. I know there are various Side-Input patterns, but none of them seem to be perfect so I stuck with an easy approach: I use a Guava Cache and if it expires and a new element comes in, processing of the element is blocked up

Configurable Parser

2020-10-21 Thread Theo Diefenthal
Hi there, In my usecase, I read data from Kafka where in each kafka partition, I have ascending timestamps. Currently, I parse the data from Kafka with a custom deserialization schema so that after parsing, the FlinkKafkaConsumerBase can extract the eventtime ascending timestamps and create p

Re: How can I drop events which are late by more than X hours/days?

2020-09-25 Thread Theo Diefenthal
Hi Arvid, be aware that allowedLateness will only be applied when your job has some windowing in use. If you have late events and you only apply mapFunctions like enrichment, as far as I know, the event's won't be filtered out automatically . Best regards Theo Von: "Arvid Heise" An: "Ma

[DISCUSS] ReplayableSourceStateBackend

2020-09-23 Thread Theo Diefenthal
Hi there, I just had the idea of a "ReplayableSourceStateBackend". I opened up a JIRA issue where I described the idea about it [1]. I would love to hear your feedback: Do you think it is possible to implement (I am not sure if a pipeline can be fully reconstructed from the source elements w

Re: [DISCUSS] Drop Scala 2.11

2020-09-21 Thread Theo Diefenthal
We use a Cloudera 6.3 cluster in prod. I'd guess that it's still widely used in prod as those cloudera upgrades for major versions are planned long time ahead and take a significant amount of resources in big data lakes. On that 6.3. cluster, if I open spark-shell, I still see scala 2.11 in use

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-07 Thread Theo Diefenthal
rkGenerator abstraction > and might be able to help you with your problem. At the moment, it looks to > me that there is no way to combine state with the new WatermarkGenerator > abstraction. > > Cheers, > Till > > On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal < > theo.

How to access state in TimestampAssigner in Flink 1.11?

2020-08-19 Thread Theo Diefenthal
Hi there, Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11. In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which also extended AbstractRichFunction and could thus utilize State and getRuntimeContext() in there. This worked as the TimestampsAndWat

Re: 回复:How to get the evaluation result of a time-based window aggregation in time after a new event falling into the window?

2020-08-17 Thread Theo Diefenthal
Hi Chengcheng Zhang, I think your request is related to this feature request from two years ago here [1], with me asking about the status one year ago [2]. You might want to upvote this so we can hope that it gets some more attention in future. Today, it is possible to write your own DataStr

Re: Two Queries and a Kafka Topic

2020-08-10 Thread Theo Diefenthal
lready used the API might shed some more light for us here. Best regards Theo [1] [ https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html | https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html ] Von: "Marco Vill

Re: Two Queries and a Kafka Topic

2020-08-05 Thread Theo Diefenthal
Hi Marco, In general, I see three solutions here you could approach: 1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, you start your streaming job from that savepoint which will load

Re: Performance test Flink vs Storm

2020-07-17 Thread Theo Diefenthal
Hi Prasanna , >From my experience, there is a ton of stuff which can slow down even a simple >pipeline heavily. One thing directly coming to my mind: "object reuse" is not >enabled. Even if you have a very simple pipeline with just 2 map steps or so, >this can lead to a ton of unneceesary deep

Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread Theo Diefenthal
hi Krzysztof, That's why my goal is to always set env.getConfig().disableGenericTypes(); in my streaming jobs. This way, you will receive an early crash if GenericTypes are used somewhere. (They are bad for the performance so I try to avoid them everywhere). Sadly, if you build up streaming

Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread Theo Diefenthal
Hi Krzysztof, Your problems arise due to Java type erasure. If you have DataPoint with Map, all Flinks type system will see is a Map, i.e. Map. So in the first case, with DataPoint having an explicit member of type "BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields,

Re: [External] Measuring Kafka consumer lag

2020-06-16 Thread Theo Diefenthal
Hi Padarn, We configure our Flink KafkaConsumer with setCommitOffsetsOnCheckpoints(true). In this case, the offsets are committed on each checkpoint for the conumer group of the application. We have an external monitoring on our kafka consumer groups (Just a small script) which writes kafka in

Re: Restore from savepoint through Java API

2020-06-13 Thread Theo Diefenthal
Hi Abhishek, I did the same like you and tested my job with a parquet StreamingFileSink via a snaphot. (And run afterwards a small spark job on the parquet asserting that my flink output is correct) Good news for you is that it is easily possible to stop the job with a savepoint. You are alr

Monitor job execution status per stage and aggregated time spent in stages

2020-06-13 Thread Theo Diefenthal
Hi there, I just watched the flink forward talk from Amazon regarding measuring uptime from [1] with slides here [2] and referencing the developer mailing list here [3]. Seems like Amazon is already running with those metrics enabled in their production cluster. I'd really like to have those

SQL Expression to Flink FilterFunction?

2020-06-04 Thread Theo Diefenthal
Hi there, I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs. I was wond

Re: Auto adjusting watermarks?

2020-05-30 Thread Theo Diefenthal
er people customizing the watermark assigners. Best regards Theo Von: "Congxian Qiu" An: "Theo Diefenthal" CC: "user" Gesendet: Samstag, 30. Mai 2020 05:06:12 Betreff: Re: Auto adjusting watermarks? Hi Could it be store a histogram data in custom `

Auto adjusting watermarks?

2020-05-29 Thread Theo Diefenthal
Hi there, Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usu

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-13 Thread Theo Diefenthal
ng a notifier sounds great, if it can be executed with parallelism 1 for all sink tasks.. Best regards Theo Von: "Guowei Ma" An: "Theo Diefenthal" CC: "user" , "yungao gy" Gesendet: Mittwoch, 13. Mai 2020 09:15:37 Betreff: Re: 回复:Re: Writing _SUC

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Theo Diefenthal
Hi Yun, For me, that sounds quite nice. I implemented the same for my application a few weeks ago, but of course tailored only to my app. What I did: 1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 2. I extended the default ProcessOperator and instead of "notifyCheckpoint

Re: Flink Serialization as stable (kafka) output format?

2020-04-20 Thread Theo Diefenthal
usages of PojoSerializer everywhere :) I'll definitely also keep in mind that avro reflect performs much worse compared to avro specific/generic then I expected. Best regards Theo Von: "Robert Metzger" An: "Arvid Heise" CC: "Theo Diefenthal" , "user"

Re: How to scale a streaming Flink pipeline without abusing parallelism for long computation tasks?

2020-04-16 Thread Theo Diefenthal
Hi, I think you could utilize AsyncIO in your case with just using a local thread pool [1]. Best regards Theo [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html Von: "Elkhan Dadashov" An: "user" Gesendet: Donnerstag, 16. April 2020 10:37:55

Re: Issues with Watermark generation after join

2020-03-16 Thread Theo Diefenthal
Hi Dominik, I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the wat

Flink Serialization as stable (kafka) output format?

2020-03-04 Thread Theo Diefenthal
Hi, Without knowing too much about flink serialization, I know that Flinks states that it serializes POJOtypes much faster than even the fast Kryo for Java. I further know that it supports schema evolution in the same way as avro. In our project, we have a star architecture, where one flink j

Best way to initialize a custom metric task for all task managers and job manager

2020-02-28 Thread Theo Diefenthal
ll? What do you think is the best way to integrate the logging metrics to flink? Best regards Theo -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent...@scoop-software.

Re: Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-26 Thread Theo Diefenthal
Von: "Khachatryan Roman" An: "Avinash Tripathy" CC: "Theo Diefenthal" , "hemant singh" , "Marco Villalobos" , "user" Gesendet: Dienstag, 25. Februar 2020 19:08:16 Betreff: Re: Timeseries aggregation with many IoT devices off o

Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-20 Thread Theo Diefenthal
ncerns to drop that support? Best, Stephan -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent...@scoop-software.de - www.scoop-software.de Sitz der Gesell

Writing a POJO Schema Evolution E2E test in Java

2020-02-20 Thread Theo Diefenthal
Hi, We have a pipeline which internally uses Java POJOs and also needs to keep some events entirely in state f or some time . >From time to time, our POJOs evolve, like attributes are added or removed. Now I wanted to write a E2E test that proves the schema migration works (Having different

RE: Parallelize Kafka Deserialization of a single partition?

2020-02-19 Thread Theo Diefenthal
eers, Till On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal mailto:theo.diefent...@scoop-software.de> > wrote: Hi, As for most pipelines, our flink pipeline start with parsing source kafka events into POJOs. We perform this step within a KafkaDeserizationSchema so that we properly

Parallelize Kafka Deserialization of a single partition?

2020-02-17 Thread Theo Diefenthal
Hi, As for most pipelines, our flink pipeline start with parsing source kafka events into POJOs. We perform this step within a KafkaDeserizationSchema so that we properly extract the event itme timestamp for the downstream Timestamp-Assigner. Now it turned out that parsing is currently the m

Re: How Flink Kafka Consumer works when it restarts

2020-02-15 Thread Theo Diefenthal
start not in the "maxium" past (like 7 days ago) but from last time committed. But that, of course, highly depends on the use case and does e.g. not work with exactly once semantics. Best regards Theo Von: "Robert Metzger" An: "Timothy Victor" CC: "

Flink distribution housekeeping for YARN sessions

2020-01-28 Thread Theo Diefenthal
ith "yarn application -kill"? Best regards Theo -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent...@scoop-software.de - www.scoop-software.de Sitz der Gesellscha

RE: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-19 Thread Theo Diefenthal
Hi Krzysztof, You can just key your stream by transaction id. If you have lots of different transaction ids, you can expect the load to be evenly distributed. All events with the same key (==transaction id) will be processed by the same task slot. If you only have a few kafka partitions, you coul

Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-12-15 Thread Theo Diefenthal
s new version survived a few stress tests already. Seems really to be some kryo / race condition which is at best avoided by avoiding kryo :) Best regards Theo Von: "Gyula Fóra" An: "Theo Diefenthal" CC: "user" Gesendet: Freitag, 29.

Re: Processing post to sink?

2019-12-14 Thread Theo Diefenthal
ation-barrier" after all "notify-checkpoint-complete"-calls to all sink instances. Can you tell me on how to do that in my own sink function? Best regards Theo Von: "Timothy Victor" An: "Theo Diefenthal" CC: "user" Gesendet: Samstag, 14. De

Processing post to sink?

2019-12-14 Thread Theo Diefenthal
Hi there, In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like: @Override public String getBucketId( V element, Context context) { return "partitionkey=" + element.getPartitionkey(); } That works well so far. Now I need to know whe

Re: Help to Understand cutoff memory

2019-12-10 Thread Theo Diefenthal
Hi Lu, I found this talk on last Flink Forward in Berlin very helpful in order to understand JVM RAM and cutoff memory [1]. Maybe it helps you understand that stuff better. In my experiences on YARN, the author was totally correct. I was able to reproduce that by assigning something about 12G

ArrayIndexOutOfBoundException on checkpoint creation

2019-11-26 Thread Theo Diefenthal
Hi, We have a pipeline with a custom ProcessFunction and state (see [1], implemented as suggested by Fabian with a ValueState and ValueState>) The behavior of that function works fine in our unittests and with low load in our test environment (100.000 records per minute). On the production e

Re: Does apache flink support stream input from Postgresql ?

2019-11-20 Thread Theo Diefenthal
u.w.ten...@gmail.com | yu.w.ten...@gmail.com ] [ https://www.linkedin.com/in/yuwatanabe1 ] [ https://twitter.com/yuwtennis ] -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent.

Flink (Local) Environment Thread Leaks?

2019-11-13 Thread Theo Diefenthal
I included a Solr End2End test in my project, inheriting from Junit 4 SolrCloudTestCase. The solr-test-framework for junit 4 makes use of com.carrotsearch.randomizedtesting which automatically tests for thread leakages on test end. In my other projects, that tool doesn't produce any problems.

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Theo Diefenthal
I agree with Gyula Fora, In our case, we have a client-machine in the middle between our YARN cluster and some backend services, which can not be reached directly from the cluster nodes. On application startup, we connect to some external systems, get some information crucial for the job runti

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-15 Thread Theo Diefenthal
Hi Vijay, Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or i

Re: Passing parameters to filter function (in DataStreams)

2019-10-10 Thread Theo Diefenthal
Hi, Your original post looks like "computeThreshold" doesn't require any parameters, but is just an expensive to compute operation. In that case, you can inherit from "RichFilterFunction" instead of "FilterFunction". In case of "RichFilterFunction", you can override the "open"-method and per

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-10 Thread Theo Diefenthal
larger issue and need to rethink on the source level already, e.g. change the method serialization to something which has a really lightweight parsing for finding the special events or such. Best regards Theo Von: "Filip Niksic" An: "Theo Diefenthal" CC: "user&

Re: Finding the Maximum Value Received so far in a Stream

2019-10-04 Thread Theo Diefenthal
Hi Komal, regarding using max Method: You can call .map() on your stream and convert the POJO to another stream/type, e.g. having only the x coordinate of the POJO and then apply the max operator. And as the others said: You are working on a keyed stream per fish_id, so you will get one maxi

Re: CEP operator in SQL pattern match does not clear it's state

2019-10-04 Thread Theo Diefenthal
Hi Muhammad, With "not going done" you mean that it's still growing or that it's constant? In case of it being constant, that's pretty much what is expected, right? If it's still growing, the only reason I could come up with: Do you work in event time and have watermarks properly assigned with

Re: How to prevent from launching 2 jobs at the same time

2019-10-04 Thread Theo Diefenthal
My simple workaround for it: I start the applications always from the same machine via CLI and just make a file-system-lock around execution of the check-if-task-is-already-running and task-launching part. This of course is a possible single-point-of-failure to rely on one machine starting the j

RE: Filter events based on future events

2019-09-12 Thread Theo Diefenthal
Hi Fabian, Thank’s for sharing your thought’s. I’ll give it a try. Best regards Theo From: Fabian Hueske Sent: Mittwoch, 11. September 2019 09:55 To: theo.diefent...@scoop-software.de Cc: user Subject: Re: Filter events based on future events Hi Theo, I would imple

Are there any news on custom trigger support for SQL/Table API?

2019-08-23 Thread Theo Diefenthal
Hi there, I currently evaluate to let our experienced system users write Flink-SQL queries directly. Currently, all queries our users need are implemented programmatically. There is one major problem preventing us from just giving SQL to our users directly. Almost all queries of our users ar

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-23 Thread Theo Diefenthal
o query (as the "same day"-predicate is still missing). Best regards Theo Von: "Fabian Hueske" An: "Zhenghua Gao" CC: "Theo Diefenthal" , "user" Gesendet: Freitag, 16. August 2019 10:05:45 Betreff: Re: I'm not able to make a strea

I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-12 Thread Theo Diefenthal
ecuting the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests). My questions are now: 1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN? 2. What is the proper way to formulate my initial query, find

Greedy operator in match_recognize doesn't work with defined time interval as I expect it to work

2019-08-07 Thread Theo Diefenthal
ingTimestampExtractor>() { @Override public long extractAscendingTimestamp(Tuple5 element) { return element. f1 ; } }); tEnv.registerDataStream( "SAMPLE" , stream, "user, ts.rowtime, id, payload, group" ); Table tbl = tEnv.sqlQuery( QUERY_MATCH_REOGNIZE ); DataStream backToStream = tEnv.toAppendStream(tbl, Row. class ); List list = IteratorUtils. toList (DataStreamUtils. collect (backToStream)); list.stream().forEach(System. err ::println); assertThat (list, hasSize ( 1 )); System. out .println(tEnv.explain(tbl)); } } Best regards Theo Diefenthal

Re: StreamingFileSink not committing file to S3

2019-08-05 Thread Theo Diefenthal
Hi Ravi, Please checkout [1] and [2]. That is related to Kafka but probably applies to Kinesis as well. If one stream is empty, there is no way for Flink to know about the watermark of that stream and Flink can't advance the watermark. Following downstream operators can thus not know if there

Has Flink a kafka processing location strategy?

2019-06-17 Thread Theo Diefenthal
/docs/2.4.0/streaming-kafka-0-10-integration.html# locationstrategies ) I wonder if there is such thing in Flink as well? I didn't find anything yet. Best regards Theo Diefenthal

Re: Flink end to end intergration test

2019-06-13 Thread Theo Diefenthal
umas, Dr. Stephan Ewen -- SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln Theo Diefenthal T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 theo.diefent...@scoop-software.de - www.scoop-software.de Sitz der Gesellschaft: Köln, Handelsregister: Köln, Han

Possilby very slow keyBy in program with no parallelism

2019-05-19 Thread Theo Diefenthal
g happening there in between "task1 writes" and "task2 receives" which slows down the entire pipeline, but I have no idea on what that could be. 4. Can I do something in order to boost performance by magnitudes here? Best regards Theo Diefenthal