Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks ! > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). > But I wonder how significantly this waiting f

Re: How should I process a cumulative counter?

2021-01-11 Thread Larry Aspen
Hi Aljoscha, thank you for your reply. On 2021/01/08 15:44 Aljoscha Krettek wrote: >the basic problem for your use case is that window boundaries are >inclusive for the start timestamp and exclusive for the end timestamp. That's true. What further complicates matters is that the last value of th

Re: Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-11 Thread Aljoscha Krettek
Also cc'ing d...@flink.apache.org On 2021/01/06 09:19, burkaygur wrote: 1) How do these changes impact the Table and SQL APIs? Are they completely orthogonal or can we get the benefits of the new Batch Mode with Flink SQL as well? The answer here is a bit complicated. The Table API/SQL already

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-11 Thread Aljoscha Krettek
On 2021/01/08 16:55, vinay.raic...@t-systems.com wrote: Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, and `OffsetInitializers` that you were referring to in your previous reply, for my reference please to make it more clearer for me. Ah sorry, but this I was r

Re: Re: Use Flink to process request with list of queries and aggregate

2021-01-11 Thread Yun Gao
Hi Li, From my view I think it would not be eaily use a countWindow if you have different number of records for each key (namely user in this case). I think you may need to user the low level KeyedProcessFunction [1] to keep some state by yourself. For example, each request might also carri

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishe

mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
Hey there, recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this op

Flink 1.12 Kryo Serialization Error

2021-01-11 Thread Yuval Itzchakov
Hi, I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly seri

Re: Using key.fields in 1.12

2021-01-11 Thread Timo Walther
There are plans to also derive the table schema from Avro schema. But we haven't decided on a syntax for this yet. For now, we only support this through catalogs such as Confluent schema registry. Regards, Timo On 07.01.21 21:42, Aeden Jameson wrote:  Brilliant, thank you. That will come in

Timestamp Issue with OutputTags

2021-01-11 Thread Priyanka Kalra A
Hi Team, We are generating multiple side-output tags and using default processing time on non-keyed stream. The class $YYY extends ProcessFunction and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Time

Re: Timestamp Issue with OutputTags

2021-01-11 Thread Taher Koitawala
Can you please share your code? On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A < priyanka.a.ka...@ericsson.com> wrote: > Hi Team, > > > > We are generating multiple side-output tags and using default processing > time on non-keyed stream. The class $YYY extends *ProcessFunction* O> and implem

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Chesnay Schepler
The idleTimeout you found is from an earlier attempt at implementing idleness, but making it configurable was aborted midway through as there were some API issues. The effort was subsumed by a new source interface and watermark generators that were introduced in 1.12. Some more details can be

Testing Flink Jobs

2021-01-11 Thread KristoffSC
Hi, I would like to write few tests that would check the message flow in my Flink pipeline. I would like to base my test on [1]. My StreamJob class, that has the main method has all Sinks and Source pluggable. The implementations are based also on [1]. In all examples available online I can see

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
thank you @chesnay I tried in vain to find the issue about introduction of new watermark strategy, can you provide some details about it ? Chesnay Schepler 于2021年1月11日周一 下午9:43写道: > The idleTimeout you found is from an earlier attempt at implementing > idleness, but making it configurable was

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Chesnay Schepler
This is the parent ticket for the new source interface:FLINK-10740 This is the parent ticket for the reworked watermark generators:FLINK-17653 On 1/11/2021 5:16 PM, Akisaya wrote: thank you 

Re: Testing Flink Jobs

2021-01-11 Thread Chesnay Schepler
1) You can either execute the job in a separate thread, or set DeploymentOptions.ATTACHED to false in the MiniCluster configuration. 2) The cluster not being ready is /usually/ not really an issue. I wouldn't worry about it for the time being. (The reason being that the MiniCluster resource alr

Statefun with RabbitMQ consumes message but does not run statefun

2021-01-11 Thread Stephan Pelikan
Hi, I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution - not even created. This is my main function: 1 public static void main(String[] args) throws Exception { 2 3 final var env = StreamExecutionEnvironment.getExecutionEnvironment(

Statement Sets

2021-01-11 Thread Aeden Jameson
When using statement sets, if two select queries use the same table (e.g. Kafka Topic), does each query get its own copy of data? Thank you, Aeden

Stateful Functions: Dynamically define and load remote modules

2021-01-11 Thread Ahmad Alkilani
Hi, I see that you need to tell the Flink Stateful runtime about remote stateful function modules via a yaml file provided at deploy time. Given remote modules and stateful functions are an external deployment concern anyway, Is it possible to dynamically associate Remote Modules with Remote Functi

Log length

2021-01-11 Thread Rex Fenley
Hello, We've collected over 150 MiB of log lines in 5 days. Is there a way to tell Flink to eject log lines after a certain length so we don't eventually run out of disk? ThankS1 -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG

Re: Log length

2021-01-11 Thread Chesnay Schepler
Have a look at RollingFileAppenders . These have become the default in 1.12 . On 1/12/2021 12:53 AM, Rex Fenley wrote: Hello, We've collected over 150 MiB of log lines in 5 days. Is there a way to tell Flink to e

Re: state reset(lost) on TM recovery

2021-01-11 Thread Chesnay Schepler
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed? On 1/10/2021 7:18 PM, Alexey Trenikhun wrote: Hello, I'm using Flink 1.11.3, state

Re: Log length

2021-01-11 Thread Rex Fenley
Thanks, I'll check them out. What's the default in 1.11.2? On Mon, Jan 11, 2021 at 4:26 PM Chesnay Schepler wrote: > Have a look at RollingFileAppenders > . > These have become the default in 1.12 . > > On 1/12/2021

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for Eo

Re: state reset(lost) on TM recovery

2021-01-11 Thread Alexey Trenikhun
Hello, Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class i

Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Hello, When using the TableAPI, is it safe to run a flink job with a different `-p` parallelism while restoring from a checkpoint (not a savepoint) using `-s`, without any rescaling of actual machines? I don't seem to find this documented anywhere. Thanks! -- Rex Fenley | Software Engineer -

Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Yun Tang
Hi Rex, I think doc [1] should have given some descriptions. Rescaling from previous checkpoint is still supported in current Flink version. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint Best Yun Tang

Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Thanks! Looks like you can't with unaligned checkpoints, which seems to imply that you can with normal checkpointing mechanism. On Mon, Jan 11, 2021 at 7:56 PM Yun Tang wrote: > Hi Rex, > > I think doc [1] should have given some descriptions. Rescaling from > previous checkpoint is still suppor

How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Hello, We're using the TableAPI and want to optimize for checkpoint alignment times. We received some advice to possibly use at-least-once. I'd like to understand how checkpointing works in at-least-once mode so I understand the caveats and can evaluate whether or not that will work for us. Thank

Re: How does at least once checkpointing work

2021-01-11 Thread Yuan Mei
Hey Rex, You probably will find the link below helpful; it explains how at-least-once (does not have alignment) is different from exactly-once(needs alignment). It also explains how the alignment phase is skipped in the at-least-once mode. https://ci.apache.org/projects/flink/flink-docs-release-1

Re: How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Thanks for the info. It sounds like any state which does not have some form of uniqueness could end up being incorrect. Specifically in my case, all rows passing through the execution graph have unique ids. However, any operator from groupby foreign_key then sum/count could end up with an inconsi

RE: Timestamp Issue with OutputTags

2021-01-11 Thread Priyanka Kalra A
Below is the code: public class OutputTagProcessingFunction extends ProcessFunction { private static final long serialVersionUID = 1L; private HashMap> outputMap = new HashMap<>(); private List tagList; public OutputTagProcessingFunction(List tagList) { super(); t

Re: Timestamp Issue with OutputTags

2021-01-11 Thread Taher Koitawala
Hi Priyanka, I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running. Can you try with a static named tag which is defined final. And output data that way. Added Till On Tue, Jan 12, 2021, 1

Question on getting the last succesfuly externalized checkpoint path for crashed jobs

2021-01-11 Thread DONG, Weike
Hi community, We are currently using* Externalized Checkpoints* to prevent abrupt YARN application failures, as it saves a "_metadata" file within the checkpoint folder which is essential for the job's cold recovery. As it is designed in Flink, the completed checkpoint paths are like *hdfs:///fli

AW: Statefun with RabbitMQ consumes message but does not run statefun

2021-01-11 Thread Stephan Pelikan
I found the reason: There is a class incompatibility because I changed from Statefun 2.2.1 + Flink 1.11.1 to Statefun 2.2.1 + Flink 1.12.0 But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3. Is there a possibility to use the newest version of Flink in combination with t

Pushing Down Filters

2021-01-11 Thread Satyam Shekhar
Hello, I am using Flink 1.11.2 as the execution engine for an alerting application. Our application builds atop Flink's SQL API to run streaming and batch jobs on a proprietary storage engine. We have a custom StreamTableSource implementation that connects to our storage engine. The connector curr