Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread Yun Gao
if the combination of DataStream API / Table API is sufficient for all the batch users. Any suggestions are warmly welcome. Best, Yun Gao [1] <https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams >https://nightlies.

Re: Status of File Sink Common (flink-file-sink-common)

2022-05-30 Thread Yun Gao
Hi Jun, I think the release notes should only include the issues that cause changes visible to users. Also I think by design flink-file-sink-common should not be used directly by users and it only serve as a shared module by the legacy StreamingFileSink and the new FileSink. Best, Yun --

Re: Flink 1.14.4 -> 1.15.0 Upgrade Problem

2022-05-30 Thread Yun Gao
Hi Clayton, Could you also help provide the topology of the job? Also, if convenient could you also have a look at the back-pressure status of each node, we could then locate which node are getting slowly and might cause the lag. Best, Yun

[ANNOUNCE] Apache Flink 1.15.0 released

2022-05-04 Thread Yun Gao
://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350442 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, Joe, Till, Yun Gao

Re: DBT-flink profile?

2022-03-24 Thread Yun Gao
Hi Georg, May I have a double confirmation for integrating with dbt, are you currenty want to use it for batch jobs or streaming jobs? Best, Yun Gao -- Sender:Georg Heiler Date:2022/03/25 01:27:26 Recipient:user Theme:DBT-flink

Re: Flink UI - Operator Chaining - broken with "Records Sent"

2022-03-20 Thread Yun Gao
Hi Dan This seem to be a known issue [1], and which is now tracked in [2]. As a whole, now the record sent only shows the records sent by the "last" operator in the chain, thus if there are chained sideoutput, the number of records in main output would be overriden. Best, Yun [1] https://iss

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Yun Gao
the time of registered processing timer and the time of the callbacks? Could you also share this part of result? Best, Yun Gao -- From:Binil Benjamin Send Time:2022 Mar. 18 (Fri.) 16:07 To:"yu'an huang" Cc:user Subj

Re: Clarifying ProcessFunction.onTimer and watermark behavior

2022-03-18 Thread Yun Gao
Hi Dan, The default implementation in Flink [1] would first process all the timers before emit the watermark out, Thus the watermark should be after the records emitted in processing timers. Best, Yun Gao [1] https://github.com/apache/flink/blob/ab08b52030a9612571896c579d85e000134ad0f1/flink

Re: Flink 1.15 deduplication view and lookup join

2022-02-17 Thread Yun Gao
Hi Francis, I think requiring primary for versioned table[1] used in temporarl join[2] should be expected. May I have a double confirmation that which table serves as the versioned table in this case? Is it the streaming table from the rabbitmq or the joined data? Best, Yun [1] https://n

Re: java.io.IOException: Failed to deserialize consumer record due to/ How to serialize table output for KafkaSink

2022-02-17 Thread Yun Gao
Hi, I tried with a simplied version of the attached code, and it shows the detailed exception is Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown Source) at org.apache.flink.

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Yun Gao
Hi Koffman, From TM side the only possible usage come to me is that or components like BlobCache, which is used to transfer jars or large task informations between JM and TM. But specially for BlobService, if it failed to find the file it would turn to JM via http connection. If convenient could

Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-16 Thread Yun Gao
AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly once semantic, it seems impossible for me to handle it at Flink code side. Thanks, Fuyao From:Fuyao Li Date: Thursday, February 10, 2022 at 15:48 To: Yun Gao , user Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or

Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-09 Thread Yun Gao
Hi Fuyao, Logically if a system want to support end-to-end exactly once, it should support transactions: 1. The transactions hold the records, it get pre-committed on snapshot state and get committed on checkpont succeed. 2. The transaction should still be able to be aborted after pre-committed. 3

Re: Flink rest api to start a job

2022-01-07 Thread Yun Gao
Hi Qihua Sorry may I double confirm that whether the entry class exists in both testA and testB? IF testA.jar is included on startup, it would be loaded in the parent classloader, which is the parent classloader for the user classloader that loads testB. Thus at least if the entry-class is ex

Re: Skewed Data when joining tables using Flink SQL

2022-01-07 Thread Yun Gao
Hi Anne, For one thing, for the datastream and broadcast state method, May I have a double confirmation that are you using BATCH execution mode? I think with [1] for BATCH mode it should be able to first process the broadcast side before the non-broadcast side. Best, Yun [1] https://issues.

Re: Serving Machine Learning models

2022-01-07 Thread Yun Gao
Hi Sonia, Sorry I might not have the statistics on the provided two methods, perhaps as input I could also provide another method: currently there is an eco-project dl-on-flink that supports running DL frameworks on top of the Flink and it will handle the data exchange between java and python p

Re: Exactly Once Semantics

2022-01-07 Thread Yun Gao
Hi Siddhesh, I answered on the stackoverflow and I also copied the answers here for reference: For the producer side, Flink Kafka Consumer would bookkeeper the current offset in the distributed checkpoint, and if the consumer task failed, it will restarted from the latest checkpoint and re-e

[ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-07 Thread Yun Gao
The Apache Flink community is very happy to announce the release of Apache Flink ML 2.0.0. Apache Flink ML provides API and infrastructure that simplifies implementing distributed ML algorithms, and it also provides a library of off-the-shelf ML algorithms. Please check out the release blog po

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-05 Thread Yun Gao
cases (though >>>> may not the best) and for other cases, user may need to tune the memory >>>> settings. >>>> >> > >>>> >> > >>> Can you share the tpcds results for different configs? >>>> Although we change the def

Re: Re: [DISCUSS] Drop Gelly

2022-01-04 Thread Yun Gao
Hi, Very thanks for initiating the discussion! Also +1 to drop the current DataSet based Gelly library so that we could finally drop the legacy DataSet API. For whether to keep the graph computing ability, from my side graph query / graph computing and chaining them with the preprocessing pi

Re: Re: Operator state in New Source API

2021-12-23 Thread Yun Gao
only option on a non keyed operate would be List State, my bad. Yun Gao, I'm wondering from where you get the information that " Flink only support in-memory operator state", can you point me to the documentation that says that? I cannot find any mention in the documentation abo

Re: Re: Re:Re: Window Aggregation and Window Join ability not work properly

2021-12-23 Thread Yun Gao
CC:'user@flink.apache.org' Subject:Re:Re: Window Aggregation and Window Join ability not work properly I change to watermark for `datatime` as `datatime` - interval '1' second or watermark for `datatime` as `datatime` but is still not work. At 2021-12-23 15:16:20, &q

Re: Re:Re: Window Aggregation and Window Join ability not work properly

2021-12-23 Thread Yun Gao
Recipients:Yun Gao CC:'user@flink.apache.org' Subject:Re:Re: Window Aggregation and Window Join ability not work properly I change to watermark for `datatime` as `datatime` - interval '1' second or watermark for `datatime` as `datatime` but is still not work. At 2021-12-2

Re: Window Aggregation and Window Join ability not work properly

2021-12-22 Thread Yun Gao
Hi Caiyi, The window need to be finalized with watermark[1]. I noticed that the watermark defined is `datatime` - INTERVAL '5' MINUTE, it means the watermark emitted would be the maximum observed timestamp so far minus 5 minutes [1]. Therefore, if we want to trigger the window of 16:00 ~ 16:

Re: Operator state in New Source API

2021-12-22 Thread Yun Gao
Hi Krzysztof, If I understand right, I think managed operator state might not help here since currently Flink only support in-memory operator state. Is it possible currently we first have a customized SplitEnumerator to skip the processed files in some other way? For example, if these files hav

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-14 Thread Yun Gao
--- From:Till Rohrmann Send Time:2021 Dec. 14 (Tue.) 18:57 To:Jingsong Li Cc:Yingjie Cao ; 刘建刚 ; dev ; Yun Gao ; user ; user-zh Subject:Re: [DISCUSS] Change some default config values of blocking shuffle As part of this FLIP, does it make sense to also extend the documentation fo

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yun Gao
Hi Yingjie, Very thanks for drafting the FLIP and initiating the discussion! May I have a double confirmation for taskmanager.network.sort-shuffle.min-parallelism that since other frameworks like Spark have used sort-based shuffle for all the cases, does our current circumstance still have dif

Re: Re: Re: Re: how to run streaming process after batch process is completed?

2021-12-08 Thread Yun Gao
Hi Joern, Very thanks for sharing the detailed scenarios! It inspires a lot. If I understand right, could it might be summaried as follows? 1. There is a batch job to first intialize the state, the state is used in the stream mode, and the stream pipeline is different from the the batch job. 2.

Re: Re: Re: how to run streaming process after batch process is completed?

2021-12-06 Thread Yun Gao
store (as flink could have done). This is one of my most wanted Flink features these days. Regards, Jörn On Thu, Dec 2, 2021 at 9:24 AM Yun Gao wrote: Hi Vtygoss, Very thanks for sharing the scenarios! Currently for batch mode checkpoint is not support, thus it could not create a snapshot

Re: Re: how to run streaming process after batch process is completed?

2021-12-02 Thread Yun Gao
Hi Vtygoss, Very thanks for sharing the scenarios! Currently for batch mode checkpoint is not support, thus it could not create a snapshot after the job is finished. However, there might be some alternative solutions: 1. Hybrid source [1] targets at allowing first read from a bounded source, the

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Gao
Very thanks for all the warm responses ! We are greatly welcome more use cases and co-work on Flink Remote Shuffle and bash processing with Flink~ Best, Yun -- From:Yingjie Cao Send Time:2021 Dec. 1 (Wed.) 11:16 To:dev Subject:Re

Re: Does Flink ever delete any sink S3 files?

2021-11-30 Thread Yun Gao
Hi Dan, The file sink would first write records to temporary files, namely .part-*, and commit them on checkpoint succeeding by renaming them to formal files, namely part-*. Best, Yun -- From:Dan Hill Send Time:2021 Dec. 1 (Wed.)

Re: Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

2021-11-29 Thread Yun Gao
tasks parallelism > 1, but only of them is RUNNING (other in FINISHED state)? Would there be a problem if say, we have two tasks to consume events from a kinesis source but the stream has only 1 shard? Den fre 26 nov. 2021 kl 03:14 skrev Yun Gao : Hi Jonas, Previously Flink indeed does not supp

Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

2021-11-25 Thread Yun Gao
Hi Jonas, Previously Flink indeed does not support checkpoints after some tasks finished. In 1.14 we implement a first version for this feature (namely https://issues.apache.org/jira/browse/FLINK-2491), and it could be enabled by set execution.checkpointing.checkpoints-after-tasks-finish.enable

Re: Unsubscribe

2021-11-23 Thread Yun Gao
Hi, to unsubscribe, please send an email to user-unsubscr...@flink.apache.org Best, Yun -- From:rimin515 Send Time:2021 Nov. 23 (Tue.) 20:54 To:user Subject:Unsubscribe Unsubscribe

Re: Random checkpoint failures with timeouts

2021-11-23 Thread Yun Gao
Hi Dineth, In the UI of flink there is pages for details for the checkpoints[1], could you have a look this UI to see which part of checkpoint took long time~? Best, Yun [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/

Re: Broadcasting feature not working

2021-11-23 Thread Yun Gao
Hi Sudhansu, Besides to the Caizhi's suggestion, could you also have a check if the controlStream could emit records normally by adding some kind of sinks or logs ? Best, Yun -- From:Caizhi Weng Send Time:2021 Nov. 23 (Tue.) 10

Re: Checkpoint failures without exceptions

2021-10-27 Thread Yun Gao
Hi Patrick, Could you also have a look at the stack of the tasks of the second function to see what the main thread and netty thread is doing during the checkpoint period ? Best, Yun --Original Mail -- Sender: Send Date:Wed Oct 27 22:05:40 2021 Recipients:Fli

Re: RE: Duplicate Calls to Cep Filter

2021-10-27 Thread Yun Gao
Hi Puneet, Sorry I'm not be an expert for CEP, but the underlying implementation of the CEP should be based on the NFA, and from the API documentation, `followedBy` does not require the two patterns are adjacent (namely the give pattern also accepts ['a', 'c', 'b']. Thus when recieved 'a', I th

Re: Re: Flink support for Kafka versions

2021-10-27 Thread Yun Gao
Hi Prasanna, I think the two issues would not be picked back to 1.12.x since they are all large modification and user-visiable and thus they should be not suitable to be picked back to the bugfix version. Which scala version are you going to use? Flink currently provided both version with scala

Re: Flink - dealing with missing events in keyBy window

2021-10-01 Thread Yun Gao
Hi Sahar, I think the second method should work by using the keyed process funciton and the timers [1]. It might be implemented with some code like 1. On first receiving the data for a sensor id, register a processing timer at now() + 5 seconds. 2. On receiving the record for a sensor id, increm

Re: Flink S3A failed to connect to service endpoint from IntelliJ IDE

2021-09-17 Thread Yun Gao
Hi James, For one thing, It looks to me that we should not configure the credential in pom.xml, instead, we might introduce a core-site.xml under the classpath and configured like fs.s3a.access.key fs.s3a.secret.key I tried with the abov

Re: Re: Savepoints with bootstraping a datastream function

2021-09-17 Thread Yun Gao
Hi Rakshit, I think FLIP-147 might still not be able to support this case, since for bounded jobs, it supports each task exit after a checkpoint to commit the remaining data, but it could not ensures all the tasks exit after the same checkpoint; for savepoint, it could not supporting taking a sav

Re: FLINK-14316 happens on version 1.13.2

2021-09-02 Thread Yun Gao
Hi Xiangyu, There might be different reasons for the "Job Leader... lost leadership" problem. Do you see the erros in the TM log ? If so, the root cause might be that the connection between the TM and ZK is lost or timeout. Have you checked the GC status of the TM side ? If the GC is ok, could

Re: StreamFileSink not closing file

2021-08-11 Thread Yun Gao
Hi Matthias, Sorry for the late reply, this should be a known issue that Flink would lost the last piece of data for bounded dataset with 2pc sink. However, we are expected to fix this issue in the upcoming 1.14 version [1]. Best, Yun [1] https://issues.apache.org/jira/browse/FLINK-2491

Re: State Processor API with EmbeddedRocksDBStateBackend

2021-08-10 Thread Yun Gao
Hi Xianwen, Could you also attach the full stack of the exception~? Very thanks -- Sender:xianwen jin Date:2021/08/10 21:03:44 Recipient: Theme:State Processor API with EmbeddedRocksDBStateBackend Hi Flink Community, I have an iss

Re: Re: Flink 1.12.5: The heartbeat of JobManager/TaskManager with id xxx timed out

2021-08-10 Thread Yun Gao
Hi Chenyu, The tipically reasons for the heartbeat timeout includes: 1. Long GC time in TM / JM 2. Network instability Thus does the GC log or network monitor metrics could give some hints ? Best, Yun -- Sender:Chenyu Zheng Date:

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Yun -- From:Taimoor Bhatti Send Time:2021 Jul. 19 (Mon.) 23:03 To:user@flink.apache.org ; Yun Gao Subject:Re: Apache Flink Kafka Connector not found Error Hello Yun, Many thanks for the reply... For some reason I'm not able t

Re: Set job specific resources in one StreamTableEnvironment

2021-07-19 Thread Yun Gao
Hi Paul, For parallelism, it should be able to be set with `table.exec.resource.default-parallelism` [1] , and an example to set the parameter is at the first several paragraph. But Regarding the total process memory, I think it should be only set in the cluster level since it is per-cluster

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Hi Taimoor, I think it is right regarding the provided dependency and we need to use manually included them in the classpath via the IDEA options. And regarding the FlinkKafkaConsumer issue, I tried locally and it seems it could work after adding the import ? Namely import org.apache.flink.stre

Re: Re: Re: Upgrade job topology in checkpoint

2021-06-16 Thread Yun Gao
was a code bug, but I was trying to understand if this make sense given the implementation) re-including mailing list, excluded by accident Padarn On Wed, Jun 16, 2021 at 10:59 AM Yun Gao wrote: Hi Padarn, Sorry I might not fully got the mean of new topology was ignored. Do you mean the

Re: Upgrade job topology in checkpoint

2021-06-15 Thread Yun Gao
Hi Padarn, By default the checkpoint would be disposed when the job finished or failed, they would be retained only when explicitly required [1]. From the implementation perspective I think users could be able to change topology when restored from external checkpoint, but I think Flink would not

Re: Checkpoint is timing out - inspecting state

2021-06-14 Thread Yun Gao
Hi Dan, Flink should already have integrate a tool in the web UI to monitor the detailed statistics of the checkpoint [1]. It would show the time consumed in each part and each task, thus it could be used to debug the checkpoint timeout. Best, Yun [1] https://ci.apache.org/projects/flink/fli

Re: should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

2021-06-14 Thread Yun Gao
Hi Jin, The warning would be given as long as trying to parse the type as PoJo failed, and turn to the Kryo serializer. The registered ProtobufSerializer would acts as a plugin inside the kryo serializer. Thus the warning should be able to be ignored. When serializing it would first turn to the k

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-09 Thread Yun Gao
-- From:Chirag Dewan Send Time:2021 Jun. 9 (Wed.) 15:15 To:User ; Yun Gao Subject:Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB Thanks for the reply Yun. The key is an Integer type. Do you think there can be hash collisions for Integers

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-09 Thread Yun Gao
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. -- From:Piotr Nowojski Send Time:2021 Jun. 9 (Wed.) 22:12 To:Kezhu Wang Cc:Thomas Wang ; Yun Gao ; user Subject:Re: Re: Re: Re: Failed to

Re: Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-08 Thread Yun Gao
think you might first check the key type used and the key type should has a stable hashcode method. Best, Yun --Original Mail -- Sender:Chirag Dewan Send Date:Tue Jun 8 18:06:07 2021 Recipients:User , Yun Gao Subject:Re: Multiple Exceptions during Load Test

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-08 Thread Yun Gao
CC:user Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom. Thomas On Sun, Jun 6, 2021 at 6:43 PM Yun

Re: Re: Add control mode for flink

2021-06-07 Thread Yun Gao
ground Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. Af

Re: Re: Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-06 Thread Yun Gao
Hi Marco, It seems to me that the imbalance problem and the state is independent for this issue: the data distribution is only decided by the KeySelector used. The only limitation for state is that the keyed state is bind to the KeySelector used across the tasks. If the imbalance is the root p

Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-06 Thread Yun Gao
ava:136) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ... 25 more ``` On Sat, Jun 5, 2021 at 6:24 AM Yun

Re: Re:Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-05 Thread Yun Gao
over still OutOfOrderSequenceException. when I close checkpoint, kafka broker still return OutOfOrderSequenceException to me . At 2021-06-04 17:52:22, "Yun Gao" wrote: Hi, Have you checked if the error during normal execution, or right after

Re: Re: Failed to cancel a job using the STOP rest API

2021-06-05 Thread Yun Gao
y one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented. Also could you suggest how I could use the "request-id" to get the savepoint location? Thanks. Thomas On Fri, Jun 4, 2021 at 2:31 AM Yun Gao wrote: Hi Thomas, I think you are

Re: Re: Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-05 Thread Yun Gao
Hi Marco, I think yes, the operator state could be used in batch mode. Since there is no checkpoint in batch mode, the operator state would serve as a kind of ordinary in-memory storage. Best, Yun -- Sender:Marco Villalobos Date:20

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-05 Thread Yun Gao
Hi Chirag, If be able to produce the exception, could you first add some logs to print the value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ? I think either object being null would cause NullPointerException here. For the second exception, I found a similar issue[1],

Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread Yun Gao
Hi, Have you checked if the error during normal execution, or right after failover? Best, Yun -- From:SmileSmile Send Time:2021 Jun. 4 (Fri.) 11:07 To:user Subject:open checkpoint, send message to kafka OutOfOrderSequenceExceptio

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Yun Gao
Hi Thomas, I think you are right that the CLI is also using the same rest API underlying, and since the response of the rest API is ok and the savepoint is triggered successfully, I reckon that it might not be due to rest API process, and we might still first focus on the stop-with-savepoint p

Re: Flink sql regular join not working as expect.

2021-06-04 Thread Yun Gao
Hi, I'm not the expert for the table/sql, but it seems to me that for regular joins, Flink would not re-read the dimension table after it has read it fully for the first time. If you want to always join the records with the latest version of dimension table, you may need to use the temporal j

Re: Issue reading from S3

2021-05-20 Thread Yun Gao
Hi Angelo, I tried the fail case provied with a similar one: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment

Re: Guidance for Integration Tests with External Technologies

2021-05-19 Thread Yun Gao
Hi Rion, Do you mean you are running the tests directly in the IDE like Idea for "multiple tests run in sequence" ? If the test could be successful when running separately, but would fail when running in sequence, then it seems there other tests should still infect on the failed tests. For the

Re: Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
bos Send Date:Thu May 20 01:16:39 2021 Recipients:Yun Gao CC:user Subject:Re: Questions Flink DataStream in BATCH execution mode scalability advice > On May 19, 2021, at 7:26 AM, Yun Gao wrote: > > Hi Marco, > > For the remaining issues, > > 1. For the aggregation, th

Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco, For the remaining issues, 1. For the aggregation, the 500GB of files are not required to be fit into memory. Rough speaking for the keyed().window().reduce(), the input records would be first sort according to the key (time_series.name) via external sorts, which only consumes a fix

Re: DataStream Batch Execution Mode and large files.

2021-05-18 Thread Yun Gao
Hi Marco, With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking and would use intermediate file to transfer data. Flink now support hash shuffle and sort shuffle for blocking edges[1], both of them stores the intermediate files in the directories configured by io.tmp.dirs[2].

Re: DataStream API Batch Execution Mode restarting...

2021-05-18 Thread Yun Gao
Hi Marco, Have you configured the restart strategy ? if the restart-strategy [1] is configuration into some strategies other than none, Flink should be able to restart the job automatically on failover. The restart strategy could also be configuration via StreamExecutionEnvironment#setRestartSt

Re: Re: Handling "Global" Updating State

2021-05-18 Thread Yun Gao
h. Thanks! Rion On May 17, 2021, at 1:36 AM, Yun Gao wrote:  Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast sta

Re: Re: Handling "Global" Updating State

2021-05-16 Thread Yun Gao
Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast state of some operator. The job would keep the source alive after all the

Re: Define rowtime on intermediate table field

2021-05-05 Thread Yun Gao
Hi Sumeet, I think you might first convert the table back to the DataStream [1], then define the timestamp and watermark with `assignTimestampsAndWatermarks(...)`, and then convert it back to table[2]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/

Re: Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2021-05-05 Thread Yun Gao
Hi Ragini, How did you submit your job ? The exception here is mostly cuased that the `flink-client` is not included in the classpath at the client side. If the job is submitted via the flink cli, namely `flink run -c xx.jar`, it should be included by default, and if some programming way is used,

Re: Re: pojo warning when using auto generated protobuf class

2021-04-26 Thread Yun Gao
(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props)) Sent via Superhuman On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao wrote: Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf

Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread Yun Gao
Hi John, Logically the maximum retained checkpoints are configured by state.checkpoints.num-retained [1]. Have you configured this option? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained

Re: pojo warning when using auto generated protobuf class

2021-04-24 Thread Yun Gao
Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf types yet[1], thus it would use a generic serializer created by kryo. This should not affect the rightness of the progra

Re: Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
generated clients (if APIs are exposed via Swagger or OpenApi). If that's not an option, writing a REST client from scratch is something I try to avoid as much as I can.. Best, Flavio On Fri, Apr 23, 2021 at 9:55 AM Yun Gao wrote: Hi Falvio, Very thanks for the explanation, may be another o

Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
/RestClusterClientExtended.java On Fri, Apr 23, 2021 at 5:10 AM Yun Gao wrote: Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the

Re: Official flink java client

2021-04-22 Thread Yun Gao
Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the client? Best, Yun --Original Mail -- Sende

Re: Union of more then two streams

2021-04-04 Thread Yun Gao
Hi, With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two operators, the first operators would union a and b and output the enriched data, and then .connect(c).coprocess(xx) would pass-throught the already enriched data and enrich the record from c. Since the two oper

Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Yun Gao
Hi Kai, Under unaligned checkpoint settings, there are still alignment process. Although the task could snapshot the state of the operators on received the first barrier and emit barriers to the following tasks, it still need to wait till all the barriers to be received before finalize the check

Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Yun Gao
Hi Kai, Yes, you are basically right, one minor point is that the start time is taken as the time that the checkpoint get intiated in the JM side. Best, Yun --Original Mail -- Sender:Kai Fu Send Date:Mon Apr 5 09:31:58 2021 Recipients:user Subject:Re: Meanin

Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
rmarks stop appearing in the UI. .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: Hi Dan, Have you use a too large upperBound or lowerBound? I

Re: Re: How to check checkpointing mode

2021-03-08 Thread Yun Gao
07:25:31 2021 Recipients:Flink User Mail List , Yun Gao Subject:Re: How to check checkpointing mode Hi Yun, Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods: @Override public Void call() t

Re: New settings are not honored unless checkpoint is cleared.

2021-03-08 Thread Yun Gao
Hi Yordan, What are the settings that are changed during the tests? Best, Yun -- From:Yordan Pavlov Send Time:2021 Mar. 5 (Fri.) 23:36 To:user Subject:New settings are not honored unless checkpoint is cleared. Hello there, I am

Re: questions about broadcasts

2021-03-08 Thread Yun Gao
Hi Marco, (a) It is possible for an operator to receive two different kind of broadcasts, DataStream ints = DataStream strs = ... ints.broadcast().connect(strs.broadcast()) ​.process(new CoProcessFunction(){...}); (b) Traditional Flink operator could not accept three different inputs.

Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
:46 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Checkpoint Error Hi Yun, Thanks for the response. I checked the mounts and only the JM's and TM's are mounted with this EFS. Not sure how to debug this. Thanks On Sun, Mar 7, 2021 at 8:29 PM Yun Gao wrote: Hi Navneeth, It seem

Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -

Re: Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Yun Gao
Hi Rainie, From the code it seems the current problem does not use the time-related functionality like window/timer? If so, the problem would be indepdent with the time type used. Also, it would not likely due to rebalance() since the network layer has the check of sequence number. If there are

Re: How do I call an algorithm written in C++ in Flink?

2021-03-08 Thread Yun Gao
Hi Suxi, Do you mean you want to call the algorithm in C++ ? If so, I think you could do it the same with as you wrap it in SpringBoot project via JNI. I think you do not need to add a new operator, and you could use existing Flink API, and you could load you library in open() and call the algori

Re: java options to generate heap dump in EMR not working

2021-03-07 Thread Yun Gao
Hi, I tried with the standalone session (sorry I do not have a yarn cluster in hand) and it seems that the flink cluster could startup normally. Could you check the log of NodeManager to see the detail reason that the container does not get launched? Also have you check if there are some spell

Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth, It seems from the stack that the exception is caused by the underlying EFS problems ? Have you checked if there are errors reported for EFS, or if there might be duplicate mounting for the same EFS and others have ever deleted the directory? Best, Yun --Original

Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Yun Gao
Hi Dan, I think you could see the detail of the checkpoints via the checkpoint UI[1]. Also, if you see in the pending checkpoints some tasks do not take snapshot, you might have a look whether this task is backpressuring the previous tasks [2]. Best, Yun [1] https://ci.apache.org/projects/

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread Yun Gao
Hi, Could you have a try to add the jar via python configuration explicitly? It might refer to [1]. Best, Yun [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program -

Re: Any plans to make Flink configurable with pure data?

2021-02-09 Thread Yun Gao
Hi Pilgrim, Currently table indeed could not using low level api like timer, would a mixture of sql & datastream could satisfy the requirements? A job might be created via multiple sqls, and connected via datastream operations. Best, Yun ---

  1   2   >