Re: Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Yun Gao
+1 for removing the methods that are deprecated for a while & have alternative methods. One specific thing is that if we remove the DataStream#split, do we consider enabling side-output in more operators in the future ? Currently it should be only available in ProcessFunctions, but not availabl

Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Yun Gao
Hi, Very thanks for bringing up this discussion! One more question is that does the BATCH and STREAMING mode also decides the shuffle types and operators? I'm asking so because that even for blocking mode, it should also benefit from keeping some edges to be pipeline if the resources

Re: SDK vs Connectors

2020-08-22 Thread Yun Gao
Hi Prasanna, 1) Semantically both a) and b) would be Ok. If the Custom sink could be chained with the map operator (I assume the map operator is the "Processing" in the graph), there should be also no much difference physically, if they could not chain, then writting a custom sink would caus

Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yun Gao
Congratulations Dian ! Best Yun -- Sender:Marta Paes Moreira Date:2020/08/27 17:42:34 Recipient:Yuan Mei Cc:Xingbo Huang; jincheng sun; dev; Dian Fu; user; user-zh Theme:Re: [ANNOUNCE] New PMC member: Dian Fu Congrats, Dian! On

Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-30 Thread Yun Gao
Hi Pankaj, I think it should be in org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher. Best, Yun -- Sender:Pankaj Chand Date:2020/08/31 02:40:15 Recipient:user Theme:Implementation of setBufferTimeout(

Re: Exception on s3 committer

2020-08-30 Thread Yun Gao
Hi Ivan, I think there might be some points to check: 1. Is the job restored from the latest successful checkpoint after restart ? 2. Have you ever changed the timeout settings for uncompleted multipart upload ? 3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-26

Re: Re: Exception on s3 committer

2020-09-01 Thread Yun Gao
ed to commit the multipart file, it crashed and restarted. The file is committed on s3 successfully, but not acknowledge recorded on Flink side. In between, the batch job consumed the file. I don’t know if that’s possible. Thanks Ivan On Aug 30, 2020, at 11:10 PM, Yun Gao wrote: Hi Ivan,

Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 Thread Yun Gao
Great! Very thanks @ZhuZhu for driving this and thanks for all contributed to the release! Best, Yun --Original Mail -- Sender:Jingsong Li Send Date:Thu Sep 17 13:31:41 2020 Recipients:user-zh CC:dev , user , Apache Announce List Subject:Re: [ANNOUNCE] Apac

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tas

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users Very sorry for the spoiled formats, I resent the discussion as follows. As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming m

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
27;d expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point). 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
hrmann Send Time:2020 Oct. 13 (Tue.) 17:25 To:Yun Gao Cc:Arvid Heise ; Flink Dev ; User-Flink Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion Yun Gao, I have three comments/questions: 1) When restarting all tasks independent of the st

Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander, The signature of the createRemoteEnvironment is public static StreamExecutionEnvironment createRemoteEnvironment( String host, int port, String... jarFiles); Which could also ship the jars to execute to remote cluster. Could you have a try to also pass the jar files to the r

Re: Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
om my project running in Eclipse Alex On Wed, Oct 28, 2020 at 8:21 PM Yun Gao wrote: Hi Alexander, The signature of the createRemoteEnvironment is public static StreamExecutionEnvironment createRemoteEnvironment( String host, int port, String... jarFiles); Which could also ship the ja

Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yun Gao
Hi Yi, Sorry I'm might not be experts for SQL, as a whole, since SQL should be a high-level API, the users might have less control for the jobs: 1. Unfortunately we do not have the API to catch all the errors. I think even with DataStream, we also do not provide API to catch the run

Re: Native kubernetes setup failed to start job

2020-10-29 Thread Yun Gao
Hi Liangde, I pull in Yang Wang who is the expert for Flink on K8s. Best, Yun --Original Mail -- Sender:Chen Liangde Send Date:Fri Oct 30 05:30:40 2020 Recipients:Flink ML Subject:Native kubernetes setup failed to start job I created a flink cluster in k

Re: Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Yun Gao
Hi Alexander, Sorry I might not fully understand the issue, do you means the "flink" jar is the same jar with the spring app fat jar, or they are not the same jar? As a whole, I think the parameter value we need for jarFiles is the absolute path of the jar file. We might need some logic

Re: Duplicate operators generated by plan

2020-12-03 Thread Yun Gao
Hi Rex, Could you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ? Best, Yun --Original Mail -- Sender:Rex Fenley Send Date:Fri Dec 4 02:55:41 2020

Re: How to parse list values in csv file

2020-12-03 Thread Yun Gao
Hi, The CSV only supports the types listed in [1] and must use the types in this list, thus for other types some kind of workaround is needed, like first parsed as string and parsed again later in the program. Best, Yun [1] https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bb

Re: Re: Duplicate operators generated by plan

2020-12-06 Thread Yun Gao
lesFunc extends ScalarFunction { def eval(roles: Array[String], id: java.lang.Long): Row = { val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) val isPare

Re: How to parse list values in csv file

2020-12-06 Thread Yun Gao
plain text file and then processed to objects. It solved my use case. On Fri, Dec 4, 2020 at 12:24 PM Yun Gao wrote: Hi, The CSV only supports the types listed in [1] and must use the types in this list, thus for other types some kind of workaround is needed, like first parsed as string

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-14 Thread Yun Gao
Hi all, I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues: 1. To include EndOfPartition into considerat

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Yun Gao
ettek Send Time:2020 Dec. 15 (Tue.) 18:11 To:dev Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: > 1. To include EndOfPartition into consideration for barrier alignment

Re: See lag end-to-end

2020-12-20 Thread Yun Gao
Hi Rex, I think Latency Marker is what you need [1]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking -- Sender:Rex Fenley Date:2020/12/21 04:57:59 Recipient:user Cc:Brad

Re: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward, For the second issue, have you also set the statebackend type? I'm asking so because except for the default heap statebackend, other statebackends should throws exception if the state.checkpoint.dir is not set. Since heap statebackend stores all the snapshots in the JM's memory,

Re: [Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Yun Gao
Hi Puneet, From the doc it seems submitting a job via rest api should send a post request to /jars/:jarid/run [1]. The response "Not Found" should means the REST API server does not know the request type. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops

Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi Nick, Are you using EXACTLY_ONCE semantics ? If so the sink would use transactions, and only commit the transaction on checkpoint complete to ensure end-to-end exactly-once. A detailed description could be find in [1] Best, Yun [1] https://flink.apache.org/features/2018/03/01/end-t

Re: RE: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
FS mounted directory. We do monitor backpressure through rest api periodically and we do not see any. From: Yun Gao Sent: Monday, December 21, 2020 10:40 AM To: Colletta, Edward ; user@flink.apache.org Subject: Re: checkpointing seems to be throttled. This email is from an external source -exerci

Re: Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi nick, Sorry I initially think that the data is also write into Kafka with flink . So it could be ensured that there is no delay in the write side, right ? Does the delay in the read side keeps existing ? Best, Yun --Original Mail -- Sender:nick toker

Re: StreamingFileSink closed file exception

2020-12-24 Thread Yun Gao
Hi Billy, StreamingFileSink does not expect the Encoder to close the stream passed in in encode method. However, ObjectMapper would close it at the end of the write method. Thus I think you think disable the close action for ObjectMapper, or change the encode implementation to objectMapper

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-24 Thread Yun Gao
47: Support Checkpoints After Tasks Finished Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: > 1. To include EndOfPartition into consideration for barrier alignment at > the TM side, we now tend to decouple the logic for EndOfPartit

Re: Tumbling Time Window

2021-01-03 Thread Yun Gao
Hi Navneeth For me I think you may start with using the window function and an example for the custom window function could be found in [1]. From the description I think it should be a standard Tumbling window, if implementing with the customized process function, it would end up have a sim

Re: Facing issues on kafka while running a job that was built with 1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12

2021-01-03 Thread Yun Gao
Hi Narasimha, Since the Kafka-connect itself is purely implemented with Java, thus I guess that with high probabililty it is not the issue of scala version. I think may first have a check of the kafka cluster's status ? Best, Yun --

Re: Is chk-$id/_metadata created regardless of enabling externalized checkpoints?

2021-01-04 Thread Yun Gao
Hi Dongwon, Happy new year! One meta file would be stored on top of HDFS even if external-checkpoint is not enabled. If external checkpoint is not enabled, flink would delete all the checkpoints on exit, and if external checkpoint is enabled, the checkpoints would be kept on cancel or fail c

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
g we want to have it more fine-grained on OperatorSubtaskState. Maybe we can store the flag inside managed or raw state without changing the format? On Fri, Dec 25, 2020 at 8:39 AM Yun Gao wrote: Hi all, I tested the previous PoC with the current tests and I found some new issues that mi

Re: StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

2021-01-06 Thread Yun Gao
Hi Mahendra, Sorry for the late reply. I noticed that in your code you implement a bucket assigner that reads to switch to a new bucket every minute, does it related to the current problems met ? Since different buckets would use different directories and files, when switching buckets new

Re: Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-07 Thread Yun Gao
Hi Billy, I checked the provided example and found it should be a problem of ContinuousFileReader, and I created an issue for it[1]. For temporarily go around the issue, I think you may disable the chain of ContinuousFileReaderOperator with the following operators: android.disableC

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks! I'll try to answer the issues inline: > 1. Option 1 is said to be not preferable because it wastes resources and adds > complexity (new event). > However, the resources would be wasted for a relatively short time until the > job finishes completely.

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks ! > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). > But I wonder how significantly this waiting f

Re: Re: Use Flink to process request with list of queries and aggregate

2021-01-11 Thread Yun Gao
Hi Li, From my view I think it would not be eaily use a countWindow if you have different number of records for each key (namely user in this case). I think you may need to user the low level KeyedProcessFunction [1] to keep some state by yourself. For example, each request might also carri

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for Eo

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-13 Thread Yun Gao
Hi all, I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might

Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Yun Gao
Hi Sagar, I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of

Re: StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Yun Gao
Hi Jan, Could you have a try by adding this dependency ? org.apache.parquet parquet-avro 1.11.1 Best, Yun --Original Mail -- Sender:Jan Oelschlegel Send Date:Thu Jan 14 00:49:30 2021 Recipients:user@flink.apache.org Subject:StreamingFileSi

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread Yun Gao
Hi Sagar, I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient. Best, Yun -- Send

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
Hi all, We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd like to post some points we discussed: 1) For the problem that the "new" root task coincidently finished before getting triggered successfully, we have listed two options in the FLIP-147[1], for the fi

Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Yun Gao
Hi, Very thanks for Jinsong to bring up this discussion! It should largely improve the usability after enhancing the FileSystem connector in Table. I have the same question with Piotr. From my side, I think it should be better to be able to reuse existing StreamingFileSink.

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Yun Gao
Hi Kaan, For the first issue, I think the two implementation should have difference and the first should be slower, but I think which one to use should be depend on your algorithm if it could compute incrementally only with the changed edges. However, as far as I know I think most graph algo

Re: Updating Closure Variables

2020-04-27 Thread Yun Gao
Hi Senthil, I think you are right that you cannot update closure variables directly and expect them to show up at the workers. If the variable values are read from S3 files, I think currently you will need to define a source explicitly to read the latest value of the file. Wh

回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Yun Gao
Hi Peter, Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time relate

回复:changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread Yun Gao
Hi Dhurandar: Currently StreamingFileSink should be able to change the prefix and suffix of the filename[1], it could be changed to something like -0-0. Could this solve your problem ? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_si

回复:Performance issue when writing to HDFS

2020-05-21 Thread Yun Gao
Hi Kong, Sorry that I'm not expert of Hadoop, but from the logs and Google, It seems more likely to be a problem of HDFS side [1] ? Like long-time GC in DataNode. Also I have found a similar issue from the history mails [2], and the conclusion should be similar. Best, Yun [1]

回复:onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin, If I understand right, I think you could add the logic in the onTimer callback. In this callback, OnTimerContext.output(xx, outputTag) could be used to output data to the specific sideout. Besides, you should need a new state to store the elements to output in the onTimer callba

回复:Re: onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin, I think the event time timer and process time timer in Flink should be fully decoupled: the event time timer is trigger by the watermark received, and the processing time is trigger by physical clock, and you may think them as two seperated timelines and have no guarantee on their

Re: Re: Flink Window with multiple trigger condition

2020-05-24 Thread Yun Gao
Hi, First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard

Re: Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread Yun Gao
() + 1); SearchSummaryCalculation(record, currentTuple); } sessionSummary.put(search_hex9, currentTuple); } timeState.update(currentTimeState); } On Sun, May 24, 2020 at 10:57 PM Yun Gao wrote: Hi, First sorry that I'm not expert o

Re: Question on stream joins

2020-05-28 Thread Yun Gao
Hi Sudan, As far as I know, both join and cogroup requires keys (namely partitioning), thus for the non-keyed scenario, you may have to use low-level connect operator to achieve it. In my opinion it should be something like leftSource.connect(rightSource) .process(new TagCoprocessFu

Re: Re: Question on stream joins

2020-05-29 Thread Yun Gao
t; and the right source with "1" .window(xx) .process(new XX()) In this when will the window be applied ? since the window operator is after process(new TagCoprocessFunction()). On Fri, May 29, 2020 at 11:35 AM Yun Gao wrote: Hi Sudan, As far as I know, both join and c

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yun Gao
Hi Yu, I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system prom

Re: Reading files from multiple subdirectories

2020-06-11 Thread Yun Gao
Hi Lorenzo, Read from a previouse thread [1] and the source code, I think you may set inputFormat.setNestedFileEnumeration(true) to also scan the nested files. Best, Yun [1] https://lists.apache.org/thread.html/86a23b4c44d92c3adeb9ff4a708365fe4099796fb32deb6319e0e17f%40%3Cuser.flink.apache

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Yun Gao
Hi Felipe, I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. Could you also share your current executing code and the full stacktrace of the exception ? Best, Yun [1] https://github.com/ververica/flink-tr

Re: Shared state between two process functions

2020-06-14 Thread Yun Gao
Hi Jaswin, Currently the state belongs to single operators, thus it should be not possible to share states between different operators. Could you also share the original problem want to solve by sharing states ? Best, Yun --Original Mail -- Sender:Jaswin S

Re: adding s3 object metadata while using StreamFileSink

2020-06-21 Thread Yun Gao
Hi Dhurandar, With my understand I think what you need is to get notified when a file is written successfully (committed) on the S3 FileSystem. However, currently there is no public API for the listener and there an issue tracking it [1]. With the current version, one possible method co

Re: Problems with type erasure

2020-06-22 Thread Yun Gao
Hi Vincenzo: Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Fli

Re: TypeInformation not found

2020-06-22 Thread Yun Gao
Hi yu, Have you add "import org.apache.flink.api.scala._"? It seems should be ok if the import has been added in the program: import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object Test { def main(args: Array[String]): Unit = {

Re: Re: TypeInformation not found

2020-06-24 Thread Yun Gao
Jun 23 15:51:27 2020 Recipients:Yun Gao CC:User Subject:Re: TypeInformation not found thanks Yun Gao!have added "import org.apache.flink.api.scala._", I just to run wordcount in idea . On Tue, Jun 23, 2020 at 11:16 AM Yun Gao wrote: Hi yu, Have you add "import org.apache.

Re: ElasticSearch_Sink

2020-07-15 Thread Yun Gao
Hi Dinesh, As far as I know, to implement the 2 phase commit protocol for one external system, I think the external system is required to provide some kind of transactions that could stay across sessions. With such a transaction mechansim then we could first start a new transaction and write

Re: hybrid state backends

2021-02-07 Thread Yun Gao
Hi Marco, Sorry that current statebackend is a global configuration and could not be configured differently for different operators. One possible alternative option to this requirements might be set rocksdb as the default statebackend, and for those operators that want to put state in memory,

Re: UUID in part files

2021-02-07 Thread Yun Gao
Hi Dan The SQL add the uuid by default is for the case that users want execute multiple bounded sql and append to the same directory (hive table), thus a uuid is attached to avoid overriding the previous output. The datastream could be viewed as providing the low-level api and thus it does not ad

Re: Re: flink kryo exception

2021-02-07 Thread Yun Gao
Hi yidan, One more thing to confirm: are you create the savepoint and stop the job all together with bin/flink cancel -s [:targetDirectory] :jobId command ? Best, Yun --Original Mail -- Sender:赵一旦 Send Date:Sun Feb 7 16:13:57 2021 Recipients:Till Rohrmann

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-07 Thread Yun Gao
Hi Jan, From my view, I think in Flink Window should be as a "high-level" operation for some kind of aggregation operation and if it could not satisfy the requirements, we could at least turn to using the "low-level" api by using KeyedProcessFunction[1]. In this case, we could use a ValueState

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Yun Gao
Hi, Have you also include the kakfa-connector related jar in the classpath? Best, Yun --Original Mail -- Sender:joris.vanagtmaal Send Date:Tue Feb 9 03:16:52 2021 Recipients:User-Flink Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector T

Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Yun Gao
05:31, Yun Gao wrote: Hi Jan, From my view, I think in Flink Window should be as a "high-level" operation for some kind of aggregation operation and if it could not satisfy the requirements, we could at least turn to using the "low-level" api by using KeyedProcessFunction[1

Re: Any plans to make Flink configurable with pure data?

2021-02-09 Thread Yun Gao
Hi Pilgrim, Currently table indeed could not using low level api like timer, would a mixture of sql & datastream could satisfy the requirements? A job might be created via multiple sqls, and connected via datastream operations. Best, Yun ---

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread Yun Gao
Hi, Could you have a try to add the jar via python configuration explicitly? It might refer to [1]. Best, Yun [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program -

Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Yun Gao
Hi Dan, I think you could see the detail of the checkpoints via the checkpoint UI[1]. Also, if you see in the pending checkpoints some tasks do not take snapshot, you might have a look whether this task is backpressuring the previous tasks [2]. Best, Yun [1] https://ci.apache.org/projects/

Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth, It seems from the stack that the exception is caused by the underlying EFS problems ? Have you checked if there are errors reported for EFS, or if there might be duplicate mounting for the same EFS and others have ever deleted the directory? Best, Yun --Original

Re: java options to generate heap dump in EMR not working

2021-03-07 Thread Yun Gao
Hi, I tried with the standalone session (sorry I do not have a yarn cluster in hand) and it seems that the flink cluster could startup normally. Could you check the log of NodeManager to see the detail reason that the container does not get launched? Also have you check if there are some spell

Re: How do I call an algorithm written in C++ in Flink?

2021-03-08 Thread Yun Gao
Hi Suxi, Do you mean you want to call the algorithm in C++ ? If so, I think you could do it the same with as you wrap it in SpringBoot project via JNI. I think you do not need to add a new operator, and you could use existing Flink API, and you could load you library in open() and call the algori

Re: Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Yun Gao
Hi Rainie, From the code it seems the current problem does not use the time-related functionality like window/timer? If so, the problem would be indepdent with the time type used. Also, it would not likely due to rebalance() since the network layer has the check of sequence number. If there are

Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -

Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
:46 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Checkpoint Error Hi Yun, Thanks for the response. I checked the mounts and only the JM's and TM's are mounted with this EFS. Not sure how to debug this. Thanks On Sun, Mar 7, 2021 at 8:29 PM Yun Gao wrote: Hi Navneeth, It seem

Re: questions about broadcasts

2021-03-08 Thread Yun Gao
Hi Marco, (a) It is possible for an operator to receive two different kind of broadcasts, DataStream ints = DataStream strs = ... ints.broadcast().connect(strs.broadcast()) ​.process(new CoProcessFunction(){...}); (b) Traditional Flink operator could not accept three different inputs.

Re: New settings are not honored unless checkpoint is cleared.

2021-03-08 Thread Yun Gao
Hi Yordan, What are the settings that are changed during the tests? Best, Yun -- From:Yordan Pavlov Send Time:2021 Mar. 5 (Fri.) 23:36 To:user Subject:New settings are not honored unless checkpoint is cleared. Hello there, I am

Re: Re: How to check checkpointing mode

2021-03-08 Thread Yun Gao
07:25:31 2021 Recipients:Flink User Mail List , Yun Gao Subject:Re: How to check checkpointing mode Hi Yun, Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods: @Override public Void call() t

Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
rmarks stop appearing in the UI. .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: Hi Dan, Have you use a too large upperBound or lowerBound? I

Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Yun Gao
Hi Kai, Yes, you are basically right, one minor point is that the start time is taken as the time that the checkpoint get intiated in the JM side. Best, Yun --Original Mail -- Sender:Kai Fu Send Date:Mon Apr 5 09:31:58 2021 Recipients:user Subject:Re: Meanin

Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Yun Gao
Hi Kai, Under unaligned checkpoint settings, there are still alignment process. Although the task could snapshot the state of the operators on received the first barrier and emit barriers to the following tasks, it still need to wait till all the barriers to be received before finalize the check

Re: Union of more then two streams

2021-04-04 Thread Yun Gao
Hi, With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two operators, the first operators would union a and b and output the enriched data, and then .connect(c).coprocess(xx) would pass-throught the already enriched data and enrich the record from c. Since the two oper

Re: Official flink java client

2021-04-22 Thread Yun Gao
Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the client? Best, Yun --Original Mail -- Sende

Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
/RestClusterClientExtended.java On Fri, Apr 23, 2021 at 5:10 AM Yun Gao wrote: Hi gaurav, Logicall Flink client is bear inside the StreamExecutionEnvironment, and users could use the StreamExecutionEnvironment to execute their jobs. Could you share more about why you want to directly use the

Re: Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
generated clients (if APIs are exposed via Swagger or OpenApi). If that's not an option, writing a REST client from scratch is something I try to avoid as much as I can.. Best, Flavio On Fri, Apr 23, 2021 at 9:55 AM Yun Gao wrote: Hi Falvio, Very thanks for the explanation, may be another o

Re: pojo warning when using auto generated protobuf class

2021-04-24 Thread Yun Gao
Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf types yet[1], thus it would use a generic serializer created by kryo. This should not affect the rightness of the progra

Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread Yun Gao
Hi John, Logically the maximum retained checkpoints are configured by state.checkpoints.num-retained [1]. Have you configured this option? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained

Re: Re: pojo warning when using auto generated protobuf class

2021-04-26 Thread Yun Gao
(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props)) Sent via Superhuman On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao wrote: Hi Prashant, I think the warn is given when calling return TypeInformation.of(Trace.APITrace::class.java) Currently flink does not have the native support for the protobuf

Re: Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2021-05-05 Thread Yun Gao
Hi Ragini, How did you submit your job ? The exception here is mostly cuased that the `flink-client` is not included in the classpath at the client side. If the job is submitted via the flink cli, namely `flink run -c xx.jar`, it should be included by default, and if some programming way is used,

Re: Define rowtime on intermediate table field

2021-05-05 Thread Yun Gao
Hi Sumeet, I think you might first convert the table back to the DataStream [1], then define the timestamp and watermark with `assignTimestampsAndWatermarks(...)`, and then convert it back to table[2]. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/

Re: Re: Handling "Global" Updating State

2021-05-16 Thread Yun Gao
Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast state of some operator. The job would keep the source alive after all the

Re: Re: Handling "Global" Updating State

2021-05-18 Thread Yun Gao
h. Thanks! Rion On May 17, 2021, at 1:36 AM, Yun Gao wrote:  Hi Rion, I think FLIP-150[1] should be able to solve this scenario. Since FLIP-150 is still under discussion, for now a temporary method come to me might be 1. Write a first job to read the kafka and update the broadcast sta

  1   2   >