Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-05 Thread Aljoscha Krettek
Hi Dan, I'm afraid this is not easily possible using the DataStream API in STREAMING execution mode today. However, there is one possible solution and we're introducing changes that will also make this work on STREAMING mode. The possible solution is to use the `FileSink` instead of the `St

Re: Timers not firing until stream end

2021-01-27 Thread Aljoscha Krettek
On 2021/01/27 15:09, Chesnay Schepler wrote: Put another way, if you use any of the built-in WatermarkGenerators and use event-time, then it appears that you *must* set this interval. This behavior is...less than ideal I must admit, and it does not appear to be properly documented. Setting t

Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that it should just work. I think the best way would be to try it in a test environment and then go forward with upgrading the production jobs/cluster. Best, Aljoscha On 2021/01/25 18:59, Ufuk Celebi wrote: Thanks for reachi

Re: Flink app logs to Elastic Search

2021-01-15 Thread Aljoscha Krettek
On 2021/01/15 10:43, bat man wrote: I was able to make it work with a fresh Elastic installation. Now taskmanager and jobmanager logs are available in elastic. Thanks for the pointers. Thanks for letting us know!

Re: Dead code in ES Sink

2021-01-14 Thread Aljoscha Krettek
On 2021/01/13 07:50, Rex Fenley wrote: Are you saying that this option does get passed along to Elasticsearch still or that it's just arbitrarily validated? According to [1] it's been deprecated in ES 6 and removed in ES 7. [1] https://github.com/elastic/elasticsearch/pull/38085 Sorry, I wasn'

Re: Flink app logs to Elastic Search

2021-01-13 Thread Aljoscha Krettek
On 2021/01/11 01:29, bat man wrote: Yes, no entries to the elastic search. No indices were created in elastic. Jar is getting picked up which I can see from yarn logs. Pre-defined text based logging is also available. Hmm, I can't imagine much that could go wrong. Maybe there is some interfere

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek
On 2021/01/13 12:07, vinay.raic...@t-systems.com wrote: Ok. Attached is the PPT of what am attempting to achieve w.r.t. time Hope I am all set to achieve the three bullets mentioned in attached slide to create reports with KafkaSource and KafkaBuilder approach. If you have any additional tips

Re: Dead code in ES Sink

2021-01-13 Thread Aljoscha Krettek
On 2021/01/12 15:04, Rex Fenley wrote: [2] https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131 Should [2] be remove

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek
On 2021/01/13 07:58, vinay.raic...@t-systems.com wrote: Not sure about your proposal regarding Point 3: * firstly how is it ensured that the stream is closed? If I understand the doc correctly the stream will be established starting with the latest timestamp (hmm... is it not a standard behavio

Re: Flink kafka exceptions handling

2021-01-12 Thread Aljoscha Krettek
On 2021/01/07 14:36, BELGHITH Amira (EXT) wrote: --> Our processing System is supposed to continue streaming data even though there is some Kafka errors, we are expecting that the KafkaConsumer fails but not the Flink job, do you think it is possible? I'm afraid that's not possible with Flink

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-12 Thread Aljoscha Krettek
On 2021/01/11 14:12, vinay.raic...@t-systems.com wrote: a) As mentioned by you "KafkaSource" was introduced in Flink 1.12 so, I suppose we have to upgrade to this version of Flink. Can you share the link of the stable Flink image (containerized version) to be used in our set-up keeping in mind

Re: How should I process a cumulative counter?

2021-01-12 Thread Aljoscha Krettek
Hi Larry, By now, it seems to me that the windowing API might not be the right solution for your use case. The fact that sensors can shut down arbitrarily makes it hard to calculate what window an event should fall into. Have you tried looking into `ProcessFunction`? With this you can keep

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-11 Thread Aljoscha Krettek
On 2021/01/08 16:55, vinay.raic...@t-systems.com wrote: Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, and `OffsetInitializers` that you were referring to in your previous reply, for my reference please to make it more clearer for me. Ah sorry, but this I was r

Re: Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-11 Thread Aljoscha Krettek
Also cc'ing d...@flink.apache.org On 2021/01/06 09:19, burkaygur wrote: 1) How do these changes impact the Table and SQL APIs? Are they completely orthogonal or can we get the benefits of the new Batch Mode with Flink SQL as well? The answer here is a bit complicated. The Table API/SQL already

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread Aljoscha Krettek
Hi, for your point 3. you can look at `FlinkKafkaConsumerBase.setStartFromTimestamp(...)`. Points 1. and 2. will not work with the well established `FlinkKafkaConsumer`. However, it should be possible to do it with the new `KafkaSource` that was introduced in Flink 1.12. It might be a bit r

Re: How should I process a cumulative counter?

2021-01-08 Thread Aljoscha Krettek
Hi Larry, the basic problem for your use case is that window boundaries are inclusive for the start timestamp and exclusive for the end timestamp. It's setup like this to ensure that consecutive tumbling windows don't overlap. This is only a function of how our `WindowAssigner` works, so it

Re: Flink app logs to Elastic Search

2021-01-08 Thread Aljoscha Krettek
So you're saying there is no logging output whatsoever being sent to Elasticsearch? Did you try and see if the jar file is being picked up? Are you still getting the pre-defined, text-based logging output? Best, Aljoscha On 2021/01/07 17:04, bat man wrote: Hi Team, I have a requirement to p

Re: Question about "NoWatermark" in Flink 1.9.2

2021-01-08 Thread Aljoscha Krettek
Thanks for the update! Best, Aljoscha On 2021/01/07 16:45, Peter Huang wrote: Hi, We end up finding the root cause. Since a time point, two of the partitions of the input topic don't have any data which causes the second window operator in the pipeline can't receive the watermark of all of the

Re: Flink kafka exceptions handling

2021-01-07 Thread Aljoscha Krettek
Hi, When you say that the `JobManager` goes down, you're referring to the fact that the Flink job will finish in a failed state after too many exceptions have occurred in the `FlinkKafkaConsumer. Is that correct? I'm afraid right now there is no code path that would allow catching those `Top

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Aljoscha Krettek
This is somewhat unrelated to the discussion about how to actually do the triggering when sources shut down, I'll write on that separately. I just wanted to get this quick thought out. For letting operators decide whether they actually want to wait for a final checkpoint, which is relevant at

Re: Issue in WordCount Example with DataStream API in BATCH RuntimeExecutionMode

2020-12-24 Thread Aljoscha Krettek
DataStream API [1] it was decided to deprecate these relational methods -- such as sum -- on KeyedStream. But I don't know if this means this behavior is to be expected, or not. I've cc'ed @Aljoscha Krettek , who should be able to shed some light on this. Best, David [1] https://c

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Aljoscha Krettek
Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: 1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interfe

Re: Logs of JobExecutionListener

2020-11-25 Thread Aljoscha Krettek
estClusterClientExtended.java On Mon, Nov 23, 2020 at 4:38 PM Flavio Pompermaier wrote: I don't know if they need to be added also to the ClusterClient but for sure they are missing in the RestClusterClient On Mon, Nov 23, 2020 at 4:31 PM Aljoscha Krettek wrote: On 23.11.20 16:26,

Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek
On 23.11.20 16:26, Flavio Pompermaier wrote: Thank you Aljosha,.now that's more clear! I didn't know that jobGraph.getJobID() was the solution for my use case..I was convinced that the job ID was assigned by the cluster! And to me it's really weird that the job listener was not called by the subm

Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek
On 20.11.20 22:09, Flavio Pompermaier wrote: To achieve this, I was using the RestClusterClient because with that I can use the following code and retrieve the JobID: (1) JobID flinkJobId = client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID(); All you wan

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-20 Thread Aljoscha Krettek
Sure, my pleasure! Aljoscha On 19.11.20 16:12, Simone Cavallarin wrote: Many thanks for the Help!! Simone From: Aljoscha Krettek Sent: 19 November 2020 11:46 To: user@flink.apache.org Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() On

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
tted with JobID c454a894d0524ccb69943b95838eea07 Program execution finished Job with JobID c454a894d0524ccb69943b95838eea07 has finished. Job Runtime: 139 ms EXECUTED Best, Andrey On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek wrote: JobListener.onJobExecuted() is

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Aljoscha Krettek
On 17.11.20 17:37, Simone Cavallarin wrote: Hi, I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using StatefulsessionCalculator, that is where I put together "The message",

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-19 Thread Aljoscha Krettek
lpful and insightful. Best, Dongwon On Wed, Nov 18, 2020 at 9:44 PM Aljoscha Krettek wrote: Hi Dongwon, Unfortunately, it's not that easy right now because normal Sinks that rely on checkpointing to write out data, such as Kafka, don't work in BATCH execution mode because we don'

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
the CLI client when I use the run action it if I'm not wrong). However both methods don't trigger the job listener. Il gio 19 nov 2020, 09:39 Aljoscha Krettek ha scritto: @Flavio, when you're saying you're using the RestClusterClient, you are not actually using that manually

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
@Flavio, when you're saying you're using the RestClusterClient, you are not actually using that manually, right? You're just submitting your job via "bin/flink run ...", right? What's the exact invocation of "bin/flink run" that you're using? On 19.11.20 09:29, Andrey Zagrebin wrote: Hi Flavi

Re: Union SingleOutputSteramOperator and getSideOutput doesn't work

2020-11-18 Thread Aljoscha Krettek
Hi, I'm afraid you stumbled across an inconsistency in the API. In the Java API we differentiate between DataStream and SingleOutputStreamOperator where the latter is used for "physical" operations that, among other things, allow things like getting side outputs. The Scala API hides this dif

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-18 Thread Aljoscha Krettek
ons. Are these options only for yarn session mode? Best, Dongwon On Tue, Nov 17, 2020 at 5:16 PM Aljoscha Krettek wrote: Hi, to ensure that we really are using per-job mode, could you try and use $ flink run -t yarn-per-job -d <...> This will directly specify that we want to use the

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-18 Thread Aljoscha Krettek
Hi Dongwon, Unfortunately, it's not that easy right now because normal Sinks that rely on checkpointing to write out data, such as Kafka, don't work in BATCH execution mode because we don't have checkopoints there. It will work, however, if you use a source that doesn't rely on checkpointing i

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Aljoscha Krettek
Hi, to ensure that we really are using per-job mode, could you try and use $ flink run -t yarn-per-job -d <...> This will directly specify that we want to use the YARN per-job executor, which bypasses some of the logic in the older YARN code paths that differentiate between YARN session mode

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-16 Thread Aljoscha Krettek
(m.. okay now complitely lost...) Thanks s From: Simone Cavallarin Sent: 13 November 2020 16:55 To: Aljoscha Krettek Cc: user Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() +user@ From: Simone Cavallarin Sent: 13 November 2020 16:4

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-13 Thread Aljoscha Krettek
Many thanks for the help! Best Simon ________ From: Aljoscha Krettek Sent: 12 November 2020 16:34 To: user@flink.apache.org Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() Hi, I'm not sure that what you want is possible. You say you want more windows

Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Aljoscha Krettek
Hi, I think if you don't do any operations that are sensitive to event-time then just using a UNION/UNION ALL should work because then there won't be any buffering by event time which could delay your output. Have you tried this and have you seen an actual delay in your output? Best, Aljosch

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Aljoscha Krettek
Hi, I'm not sure that what you want is possible. You say you want more windows when there are more events for a given time frame? That is when the events are more dense in time? Also, using the event timestamp as the gap doesn't look correct. The gap basically specifies the timeout for a ses

Re: Flink 1.8.3 GC issues

2020-11-12 Thread Aljoscha Krettek
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251 On 11.11.20 19:09, Aljoscha Krettek wrote: Hi, nice work on debugging this! We need the synchronized block in the source because the call to reader.advance() (via the invoker) and reader.getCurrent() (via

Re: Flink 1.8.3 GC issues

2020-11-11 Thread Aljoscha Krettek
se see the flame graph (*CPU-graph-at-issuetime.svg*) Note: SVG file can be opened using any browser and it is clickable while opened. -- Thanks Josson -- Thanks Josson -- Thanks Josson -- Thanks Josson -- Thanks Josson -- Thanks Josson -- Thanks Josso

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-11 Thread Aljoscha Krettek
Best regards, Tim On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek wrote: On 10.11.20 11:53, Tim Josefsson wrote: Also when checking my logs I see the following message: 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 [omitte

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Aljoscha Krettek
On 10.11.20 11:53, Tim Josefsson wrote: Also when checking my logs I see the following message: 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 [omitted for brevity] transaction.timeout.ms = 90 transactional.id = Sour

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-09 Thread Aljoscha Krettek
i Till, >> >> That's great! thank you so much!!! I have spent one week on this. I'm so >> relieved! >> >> Cheers >> >> s >> >> >> -- >> *From:* Till Rohrmann >> *Sent:* 06 November 2020 17:5

Re: Flink kafka - Message Prioritization

2020-11-04 Thread Aljoscha Krettek
I'm afraid there's nothing in Flink that would make this possible right now. Have you thought about if this would be possible by using the vanilla Kafka Consumer APIs? I'm not sure that it's possible to read messages with prioritization using their APIs. Best, Aljoscha On 04.11.20 08:34, Rob

Re: Filter By Value in List

2020-11-02 Thread Aljoscha Krettek
I believe this is happening because the type system does not recognize that list of Strings as anything special but treats it as a black-box type. @Timo: Would this work with the new type system? Best, Aljoscha On 02.11.20 06:47, Rex Fenley wrote: Hello, I'm trying to filter the rows of a ta

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-02 Thread Aljoscha Krettek
e.flink" % "flink-avro" % flinkVersion % "provided", "org.apache.flink"%% "flink-parquet" % flinkVersion % "provided", "org.apache.flink"%% "flink-runtime-web" % fl

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-02 Thread Aljoscha Krettek
@Timo: Is this sth that would work when using the new type stack? From the message I'm assuming it's using the older type stack. @Rex: Which Flink version are you using and could you maybe post the code snipped that you use to do conversions? Best, Aljoscha On 02.11.20 06:50, Rex Fenley wrot

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-02 Thread Aljoscha Krettek
@Timo and/or @Jark, have you seen this problem before? @Yuval, I'm assuming you're using sbt as a build system, is that correct? Could you maybe also post a snippet of your build file that shows the dependency setup or maybe the whole file(s). Best, Aljoscha On 01.11.20 13:34, Yuval Itzchako

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-10 Thread Aljoscha Krettek
ng to the console. Only errors appear in my terminal window and the test logs. Maybe console logger does not work for this junit setup. I'll see if the file version works. On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: What Aljoscha suggested is w

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Aljoscha Krettek
binary searched this issue, this failure happens if my query in step 3 has a join it. If I remove the join, I can remove step 4 and the code still works. I've renamed a bunch of my tables too and the problem still exists. On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek wrote: Hi Dan, ther

Re: 回复: need help about "incremental checkpoint",Thanks

2020-10-06 Thread Aljoscha Krettek
I'm forwarding my comment from the Jira Issue [1]: In https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/wordcount_increstate.scala you set the RocksDBStateBackend, in https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/StateWordCount.scala you set t

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Aljoscha Krettek
Hi Dan, there were some bugs and quirks in the MiniCluster that we recently fixed: - https://issues.apache.org/jira/browse/FLINK-19123 - https://issues.apache.org/jira/browse/FLINK-19264 But I think they are probably unrelated to your case. Could you enable logging and see from the logs whet

Re: ConnectionPool to DB and parallelism of operator question

2020-10-06 Thread Aljoscha Krettek
Hi, since I don't know the implementation of the Sink I can only guess. I would say you get 82 * 300 connections because you will get 82 instances of a sink operator and each of those would then have a connection pool of 300 connections. The individual sink instances will (potentially) run on

Re: Reading from HDFS and publishing to Kafka

2020-09-29 Thread Aljoscha Krettek
Hi, I actually have no experience running a Flink job on K8s against a kerberized HDFS so please take what I'll say with a grain of salt. The only thing you should need to do is to configure the path of your keytab and possibly some other Kerberos settings. For that check out [1] and [2].

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Aljoscha Krettek
nd to that and we shouldn't have a slow moving component block us to support a fast moving component if it's such apparent that users want it. @Aljoscha Krettek could you please pick that topic up and ping the respective maintainers? [1] http://apache-flink-user-mailing-list-archive.2

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-18 Thread Aljoscha Krettek
On 14.09.20 02:20, Steven Wu wrote: Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg sink use case, because we can't retrieve the checkpointId from the FunctionInitializationContext during the restore case. But we can move away from it if the restore context provides th

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-11 Thread Aljoscha Krettek
n. I think that's a rather common usecase in Flink which can optimize the latency a lot, so I would love to have some more features directly from Flink to better support "processing per kafka partition" without the need to shuffle. Best regards Theo - Ursprüngliche Mail --

Re: [DISCUSS] Drop Scala 2.11

2020-09-10 Thread Aljoscha Krettek
Yes! I would be in favour of this since it's blocking us from upgrading certain dependencies. I would also be in favour of dropping Scala completely but that's a different story. Aljoscha On 10.09.20 16:51, Seth Wiesman wrote: Hi Everyone, Think of this as a pre-flip, but what does everyon

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-09-10 Thread Aljoscha Krettek
It assumes that the pipeline is about to finish shortly and aborts the checkpoint. This along with the watermark generation problems kind of make it difficult to use file source in production. On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek wrote: Hi Arti, what exactly do you mean by &quo

[DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Aljoscha Krettek
Hi Devs, @Users: I'm cc'ing the user ML to see if there are any users that are relying on this feature. Please comment here if that is the case. I'd like to discuss the deprecation and eventual removal of UnionList Operator State, aka Operator State with Union Redistribution. If you don't kn

Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-08 Thread Aljoscha Krettek
Hi, this is indeed the correct behaviour right now. Which doesn't mean that it's the behaviour that we would like to have. The reason why we can't move the "pending" files to "final" is that we don't have a point where we can do this in an idempotent and retryable fashion. When we do regular

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-09-08 Thread Aljoscha Krettek
n top of this method. Nevertheless I see it as an extension of the DataStream API for BATCH execution rather than making the DataStream API work for BATCH.  Therefore I'd be fine with the leaving the Broadcast State out of the FLIP What do you think? On 01/09/2020 13:46, Aljoscha Krettek wrote:

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-07 Thread Aljoscha Krettek
Hi, sorry for the inconvenience! I'm sure we can find a solution together. Why do you need to keep state in the Watermark Assigner? The Kafka source will by itself maintain the watermark per partition, so just specifying a WatermarkStrategy will already correctly compute the watermark per par

Re: Idle stream does not advance watermark in connected stream

2020-09-01 Thread Aljoscha Krettek
desired behavior. [1] https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79 On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek mailto:aljos...@apache.org>> wrote:

Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-27 Thread Aljoscha Krettek
connector?     If I remember correctly, the universal connector is compatible     with 0.10 brokers, but I want to double check that.     Best,     Paul Lam     2020年8月24日 22:46,Aljoscha Krettek mailto:aljos...@apache.org>> 写道:     Hi all,     this thought came up on FLINK-17260 [1] but I think

Re: Idle stream does not advance watermark in connected stream

2020-08-26 Thread Aljoscha Krettek
Yes, I'm afraid this analysis is correct. The StreamOperator, AbstractStreamOperator to be specific, computes the combined watermarks from both inputs here: https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-25 Thread Aljoscha Krettek
Thanks for creating this FLIP! I think the general direction is very good but I think there are some specifics that we should also put in there and that we may need to discuss here as well. ## About batch vs streaming scheduling I think we shouldn't call it "scheduling", because the decision b

[DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Aljoscha Krettek
Hi all, this thought came up on FLINK-17260 [1] but I think it would be a good idea in general. The issue reminded us that Kafka didn't have an idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have had the "modern" Kafka connector that roughly follows new Kafka releases for a

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-24 Thread Aljoscha Krettek
Hi Arti, what exactly do you mean by "checkpoints do not work"? Are there exceptions being thrown? How are you writing your file-based sources, what API methods are you using? Best, Aljoscha On 20.08.20 16:21, Arti Pande wrote: Hi Till, Thank you for your quick response. Both the AssignerW

Re: Customization of execution environment

2020-07-31 Thread Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth and that the programmatic setter methods and everything else just modify the underlying configuration. We have made big steps in at least allowing to configure most (if not all) StreamExecutionEnvironment and TableEnviron

Re: [DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-30 Thread Aljoscha Krettek
up and starting the discussion. I am in favor of unifying the APIs the way described in the FLIP and deprecating the DataSet API. I am looking forward to the detailed discussion of the changes necessary. Best, Marton On Wed, Jul 29, 2020 at 12:46 PM Aljoscha Krettek < aljos...@apache.org>

[DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-29 Thread Aljoscha Krettek
Hi Everyone, my colleagues (in cc) and I would like to propose this FLIP for discussion. In short, we want to reduce the number of APIs that we have by deprecating the DataSet API. This is a big step for Flink, that's why I'm also cross-posting this to the User Mailing List. FLIP-131: http:/

Re: AllwindowStream and RichReduceFunction

2020-07-27 Thread Aljoscha Krettek
20, 2020 at 6:32 PM Aljoscha Krettek wrote: What are you trying to do in the ReduceFunction? Without knowing the code, maybe an aggregate(AggregateFunction) is the solution. Best, Aljoscha On 20.07.20 18:03, Flavio Pompermaier wrote: Thanks Aljosha for the reply. So what can I do in my reduce

Re: GenericData cannot be cast to type scala.Product

2020-07-24 Thread Aljoscha Krettek
For anyone following this: the discussion is happening on the Jira issue: https://issues.apache.org/jira/browse/FLINK-18478 Best, Aljoscha On 23.07.20 15:32, Georg Heiler wrote: Hi, as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now face a class cast exception. The repr

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
(i.e. not serializable)? On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek wrote: Hi Flavio, the reason is that under the covers the ReduceFunction will be used as the ReduceFunction of a ReducingState. And those cannot be rich functions because we cannot provide all the required context "i

Re: Status of a job when a kafka source dies

2020-07-20 Thread Aljoscha Krettek
Hi, Flink doesn't do any special failure-handling or retry logic, so it’s up to how the KafkaConsumer is configured via properties. In general Flink doesn’t try to be smart: when something fails an exception fill bubble up that will fail this execution of the job. If checkpoints are enabled t

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
Hi Flavio, the reason is that under the covers the ReduceFunction will be used as the ReduceFunction of a ReducingState. And those cannot be rich functions because we cannot provide all the required context "inside" the state backend. You can see how the ReduceFunction is used to create a R

Re: map JSON to scala case class & off-heap optimization

2020-07-15 Thread Aljoscha Krettek
On 11.07.20 10:31, Georg Heiler wrote: 1) similarly to spark the Table API works on some optimized binary representation 2) this is only available in the SQL way of interaction - there is no programmatic API yes it's available from SQL, but also the Table API, which is a programmatic declarati

Re: flink take single element from stream

2020-07-10 Thread Aljoscha Krettek
I'm afraid limit() is not yet available on the Table API but you can use it via SQL, i.e. sth like "select * FROM (VALUES 'Hello', 'CIAO', 'foo', 'bar') LIMIT 2;" works. You can execute that from the Table API via `TableEnvironment.executeSql()`. Best, Aljoscha On 09.07.20 17:53, Georg Heiler

Re: map JSON to scala case class & off-heap optimization

2020-07-10 Thread Aljoscha Krettek
Hi Georg, I'm afraid the other suggestions are missing the point a bit. From your other emails it seems you want to use Kafka with JSON records together with the Table API/SQL. For that, take a look at [1] which describes how to define data sources for the Table API. Especially the Kafka and J

Re: MalformedClassName for scala case class

2020-07-10 Thread Aljoscha Krettek
Hi, could you please post the stacktrace with the exception and also let us know which Flink version you're using? I have tried the following code and it works on master/flink-1.11/flink-1.10: case class Foo(lang: String, count: Int) def main(args: Array[String]): Unit = { val senv

Re: Task recovery?

2020-07-10 Thread Aljoscha Krettek
On 03.07.20 18:42, John Smith wrote: If I understand correctly on June 23rd it suspended the jobs? So at that point they would no longer show in the UI or be restarted? Yes, that is correct, though in the logs it seems the jobs failed terminally on June 22nd: 2020-06-22 23:30:22,130 INFO or

Re: Avro from avrohugger still invalid

2020-07-03 Thread Aljoscha Krettek
now? Best, Georg Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler < georg.kf.hei...@gmail.com>: What is the suggested workaround for now? Thanks! Aljoscha Krettek schrieb am Do. 2. Juli 2020 um 20:55: Hi Georg, unfortunately, it seems I only fixed the issue for AvroSerial

Re: Avro from avrohugger still invalid

2020-07-02 Thread Aljoscha Krettek
Hi Georg, unfortunately, it seems I only fixed the issue for AvroSerializer and not for AvroDeserializationSchema. I created a new issue (which is a clone of the old one) to track this [1]. The fix should be very simple since it's the same issue. Best, Aljoscha [1] https://issues.apache.org

Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Did you look at the watermark metrics? Do you know what the current watermark is when the windows are firing. You could also get the current watemark when using a ProcessWindowFunction and also emit that in the records that you're printing, for debugging. What is that TimestampAssigner you're

Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Sorry, I now saw that this thread diverged. My mail client didn't pick it up because someone messed up the subject of the thread. On 16.06.20 14:06, Aljoscha Krettek wrote: Hi, what is the timescale of your data in Kafka. If you have data in there that spans more than ~30 minutes I

Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Hi, what is the timescale of your data in Kafka. If you have data in there that spans more than ~30 minutes I would expect your windows to fire very soon after the job is started. Event time does not depend on a wall clock but instead advances with the time in the stream. As Flink advances th

Re: Does Flink support reading files or CSV files from java.io.InputStream instead of file paths?

2020-06-16 Thread Aljoscha Krettek
Hi Marco, this is not possible since Flink is designed mostly to read files from a distributed filesystem, where paths are used to refer to those files. If you read from files on the classpath you could just use plain old Java code and won't need a distributed processing system such as Flink.

Re: Improved performance when using incremental checkpoints

2020-06-16 Thread Aljoscha Krettek
Hi, it might be that the operations that Flink performs on RocksDB during checkpointing will "poke" RocksDB somehow and make it clean up it's internal hierarchies of storage more. Other than that, I'm also a bit surprised by this. Maybe Yun Tang will come up with another idea. Best, Aljosch

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-09 Thread Aljoscha Krettek
Hi, I agree with Robert that adding open/close support for partitioners would mean additional complexity in the code base. We're currently not thinking of supporting that. Best, Aljoscha On 05.06.20 20:19, Arvid Heise wrote: Hi Arnaud, just to add up. The overhead of this additional map is

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-26 Thread Aljoscha Krettek
I think what might be happening is that you're mixing dependencies from the flink-sql-connector-kafka and the proper flink-connector-kafka that should be used with the DataStream API. Could that be the case? Best, Aljoscha On 25.05.20 19:18, Piotr Nowojski wrote: Hi, It would be helpful if y

Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-25 Thread Aljoscha Krettek
o of note, we are using a FsStateBackend configuration, and plan to move to RocksDBStateBackend, but from what I can tell, this would only reduce memory and delay hitting the heap memory capacity, not stall it forever? Thanks Chris On 5/18/20, 7:29 AM, "Aljoscha Krettek"

Re: Performance impact of many open windows at the same time

2020-05-25 Thread Aljoscha Krettek
Hi, I don't think this will immediately degrade performance. State is essentially stored in a HashMap (for the FileStateBackend) or RocksDB (for the RocksDB backend). If these data structures don't degrade with size then your performance also shouldn't degrade. There are of course some effec

Re: "Fill in" notification messages based on event time watermark

2020-05-18 Thread Aljoscha Krettek
I think there is some confusion in this thread between the auto watermark interval and the interval (length) of an event-time window. Maybe clearing that up for everyone helps. The auto watermark interval is the periodicity (in processing time) at which Flink asks the source (or a watermark ge

Re: Memory growth from TimeWindows

2020-05-18 Thread Aljoscha Krettek
On 15.05.20 15:17, Slotterback, Chris wrote: My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames expire and release back to the JV

Re: Export user metrics with Flink Prometheus endpoint

2020-05-18 Thread Aljoscha Krettek
lt;https://github.com/census-instrumentation/opencensus-java> to collect application metrics, so is there an easy way to integrate this metrics with flink metrics endpoint. Thanks! Eleanore On Wed, May 6, 2020 at 7:48 AM Aljoscha Krettek wrote: Hi, that should be possible. Did you have a

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek
since the flink tuple is in the java api package in flink ? Best, Nick. On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek wrote: Hi, Flink will not do any casting between types. You either need to output to correct (Scala) Tuple type from the deserialization schema or insert a step (say a map

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek
Hi, Flink will not do any casting between types. You either need to output to correct (Scala) Tuple type from the deserialization schema or insert a step (say a map function) that converts between the two types. The Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in common

Re: MongoDB as a Sink;

2020-05-06 Thread Aljoscha Krettek
Hi, yes, that is correct. You need to implement a SinkFunction. For getting started you can take a look at the Elasticsearch connector because Elasticsearch and MongoDB are roughly similar in terms of how you work with them, i.e. they are both key-value stores. Best, Aljoscha On 06.05.20 02

  1   2   3   4   5   6   7   8   9   10   >