Re: question on ValueState

2021-02-10 Thread Roman Khachatryan
Right, in this case FileSystemStateBackend is the right choice. The state size is limited by TM memory as you said. Regards, Roman On Tue, Feb 9, 2021 at 8:54 AM yidan zhao wrote: > What I am interested in is whether I should use rocksDB to replace > fileBackend. > RocksDB's performance is not

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-10 Thread Roman Khachatryan
Hi Dongwon, With State Processor API you should be able to create a new snapshot that doesn't reference the unused classes. Regards, Roman On Tue, Feb 9, 2021 at 3:39 AM Dongwon Kim wrote: > Hi Khachatryan, > > Thanks for the explanation and the input! > > 1. Use the State Processor API to cr

Re: Object and Integer size in RocksDB ValueState

2021-02-23 Thread Roman Khachatryan
Hi Maciej, If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same). With RocksDB, any state object is serialized first and only then it is stored in MemTable or in

Re: Install/Run Streaming Anomaly Detection R package in Flink

2021-02-23 Thread Roman Khachatryan
Hi, I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better. Regards, Roman On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen wrote: > My customer wants us to install this package in our Flink Cluster: > > https://github.com/twitter/AnomalyDetection > > One of our engineers developed a

Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread Roman Khachatryan
Hi, You can use watermark strategy with bounded out of orderness in DDL, please refer to [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark Regards, Roman On Tue, Feb 23, 2021 at 12:48 PM joris.vanagtmaal < joris.vanagtm...@wartsila.com> w

Re: Flink custom trigger use case

2021-02-23 Thread Roman Khachatryan
Hi, I've noticed that you are using an event time window, but the trigger fires based on processing time. You should also register an event time timer (for the window end). So that trigger.onEventTime() will be called. And it's safer to check if the state (firstSeen) value is true, not just exists

Re: Datastream Lag Windowing function

2021-02-23 Thread Roman Khachatryan
Hi, I can't see neither wrong nor expected output in your message, can you re-attach it? Could you provide the code of your pipeline including the view creation? Are you using Blink planner (can be chosen by useBlinkPlanner [1])? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/d

Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread Roman Khachatryan
The watermark resolution in Flink is one millisecond [1], so the 1st form essentially doesn't allow out-of-orderness (though the elements with the same timestamp are not considered late in this case). [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html

Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Roman Khachatryan
ue? Is it the 16 bytes that Java requires in-memory? If > I'll change my ValueState to integer, and provide additional value > there, will it require more storage space? Also, to respond to your > point about compression, we're using incremental checkpoints, so I > don't th

Re: Flink custom trigger use case

2021-02-25 Thread Roman Khachatryan
Hi, Yes, you have an Iterable with window elements as the ProcessWindowFunction input. You can then emit them individually. Regards, Roman On Thu, Feb 25, 2021 at 7:22 AM Diwakar Jha wrote: > Hello, > > I tried using *processWindowFunction* since it gives access to > *globalstate* through *co

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-03-02 Thread Roman Khachatryan
Hi Jan, Thanks for sharing your solution. You probably also want to remove previously created timer(s) in processElement; so that you don't end up with a timer per element. For that, you can store the previous time (in function state). Regards, Roman On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch

[ANNOUNCE] Apache Flink 1.12.2 released

2021-03-05 Thread Roman Khachatryan
The Apache Flink community is very happy to announce the release of Apache Flink 1.12.2, which is the second bugfix release for the Apache Flink 1.12 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming

Re: Checkpoint fail due to timeout

2021-03-11 Thread Roman Khachatryan
Hello, This can be caused by several reasons such as back-pressure, large snapshots or bugs. Could you please share: - the stats of the previous (successful) checkpoints - back-pressure metrics for sources - which Flink version do you use? Regards, Roman On Thu, Mar 11, 2021 at 7:03 AM Alexey

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Hi Yuri, The state that you access with getRuntimeContext().getState(...) is scoped to the key (so for every new key this state will be null). What key do you use? Regards, Roman On Fri, Mar 12, 2021 at 7:22 AM Maminspapin wrote: > > I have following piece of configuration in flink.yaml: > > Ke

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Are you starting the job from savepoint [1] when submitting it again? If not, it is considered as a new job and will not pick up the old state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint Regards, Roman On Fri, Mar 12, 2021 at 1

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
Hi Vishal, There is no leak in the code you provided (except that the number of keys can grow). But as you figured out the state is scoped to key, not to window+key. Could you explain what you are trying to achieve and why do you need to combine sliding windows with state scoped to window+key? R

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Roman Khachatryan
Hi, Do I understand correctly that: 1. The workload varies across the jobs but stays the same for the same job 2. With a small number of slots per TM you are concerned about uneven resource utilization when running low- and high-intensive jobs on the same cluster simultaneously? If so, wouldn't r

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
count of 1. >> >> >> (except that the number of keys can grow). >> >> Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> >> >> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan wro

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Roman Khachatryan
Hi Sebastian, Did you try setting debezium-json-map-null-key-mode to DROP [1]? I'm also pulling in Timo who might know better. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode Regards, Roman On Fri, Mar 12,

Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-12 Thread Roman Khachatryan
Hi Alexis, This looks like a bug, I've created a Jira ticket to address it [1]. Please feel free to provide any additional information. In particular, whether you are able to reproduce it in any of the subsequent releases. [1] https://issues.apache.org/jira/browse/FLINK-21752 Regards, Roman O

Re: Checkpoint fail due to timeout

2021-03-15 Thread Roman Khachatryan
2.2 with same results > > Thanks, > Alexey > ________ > From: Roman Khachatryan > Sent: Thursday, March 11, 2021 11:49 PM > To: Alexey Trenikhun > Cc: Flink User Mail List > Subject: Re: Checkpoint fail due to timeout > > Hello, > >

Re: Questions with State Processor Api

2021-03-15 Thread Roman Khachatryan
Hi Yuri, I think you can achieve this by using "normal" flink operators and sinks. One thing that immediately comes to my mind are timers [1]. It should be simpler to implement and setup rather than with the State Processor API (though it seems doable via this API too). [1] https://ci.apache.org/

Re: Checkpoint fail due to timeout

2021-03-22 Thread Roman Khachatryan
Thanks for sharing the thread dump. It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you

Re: Capturing Statement Execution / Results within JdbcSink

2021-03-22 Thread Roman Khachatryan
Hey Rion, Regarding > Accessing Statement Execution / Results, There are no ways currently to get the update count from the database unfortunately. As for the > Batching Mechanisms (withBatchIntervalMs & withBatchSize), These parameters should have "OR" semantics: the database should be updated w

Re: Checkpoint fail due to timeout

2021-03-23 Thread Roman Khachatryan
at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) >

Re: Source Operators Stuck in the requestBufferBuilderBlocking

2021-04-06 Thread Roman Khachatryan
Hi Sihan, Unfortunately, we are unable to reproduce the issue so far. Could you please describe in more detail the job graph, in particular what are the downstream operators and whether there is any chaining? Do I understand correctly, that Flink returned back to normal at around 8:00; worked fin

Re: clear() in a ProcessWindowFunction

2021-04-09 Thread Roman Khachatryan
ove, As in this state is 1 degree removed from what > ever flink does internally with it's merges given that the state is scoped to > the key. > > > > > > > > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi > wrote: >> >> Yep, makes s

Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
Hi, I'm not sure that I fully understand your question. Is the intention to prioritize some jobs over the others in the same Flink cluster? Currently, it is not possible (FLIP-156 and further work aim to address this [1]). At the moment, you can either - deploy the jobs in separate clusters (per-j

Re: Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
f problems (such as tm lost and connection time > out). So I want wo limit the speed of processing data on batch job. > > > > > > > > At 2021-04-12 15:49:31, "Roman Khachatryan" wrote: > >Hi, > > > >I'm not sure that I fully understand your

Re: how to convert DataStream to Table

2021-04-12 Thread Roman Khachatryan
Hi, I'm pulling in Timo and Jark as they know Table API better. Regards, Roman On Sun, Apr 11, 2021 at 3:36 PM vtygoss wrote: > > Hi All, > > > there is a scenario where I need to process OGG Log data in kafka using Flink > Sql. I can convert the OGG Log Stream to DataStream and each event >

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
Hi, Could you please explain what you mean by internal restarts? If you commit offsets or timestamps from sink after emitting records to the external system then there should be no data loss. Otherwise (if you commit offsets earlier), you have to persist in-flight records to avoid data loss (i.e.

Re: Flink Metric isBackPressured not available

2021-04-12 Thread Roman Khachatryan
Hi, The metric is registered upon task deployment and reported periodically. Which Flink version are you using? The metric was added in 1.10. Are you checking it in the UI? Regards, Roman On Fri, Apr 9, 2021 at 8:50 PM Claude M wrote: > > Hello, > > The documentation here > https://ci.apache.

Re: Flink 1.11.4?

2021-04-12 Thread Roman Khachatryan
Hi Maciek, There are no specific plans for 1.11.4 yet as far as I know. The official policy is to support the current and previous minor release [1]. So 1.12 and 1.13 will be officially supported once 1.13 is released. However, it's likely that 1.11.4 will still be released. [1] https://flink.apa

Re: Query regarding flink metric types

2021-04-12 Thread Roman Khachatryan
Hi Suchithra, You are right, those metrics can only grow, at least until failover. isBackPressured is reported as a boolean on subtask level. These samples are then aggregated and a ratio of (times-back-pressured / number-of-samples) is reported to the JobManager. Regards, Roman On Fri, Apr 9,

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
any other ways to guarantee > "at least once" processing without enabling checkpointing? > > Thanks, > Rahul > > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan wrote: >> >> Hi, >> >> Could you please explain what you mean by internal rest

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Roman Khachatryan
re any workaround to get "at least once" semantics with Flink Automatic > restarts in this case? > > Regards, > Rahul > > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan wrote: >> >> Hi, >> >> Thanks for the clarification. >> >> >

Re: Flink 1.11.4?

2021-04-14 Thread Roman Khachatryan
Hi Yuval, I'd expect 1.13 to be available in 2-3 weeks (there are no exact estimates). Regards, Roman On Tue, Apr 13, 2021 at 12:08 PM Yuval Itzchakov wrote: > > Roman, is there an ETA on 1.13? > > On Mon, Apr 12, 2021, 16:17 Roman Khachatryan wrote: >> >>

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
Hi Raghavendar, In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emi

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
w.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> > <#m_9175636772900776859_m_7337441106478363842_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2> > > On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan > wrote: > >> Hi Raghavendar, >> >> I

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
709238175_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2> > > On Thu, Apr 29, 2021 at 9:10 PM Roman Khachatryan > wrote: > >> Flink uses checkpoint barriers that are sent through along the same >> channels as data. Events are included into the checkpoint if they precede >> the c

Re: Backpressure configuration

2021-04-29 Thread Roman Khachatryan
Hello Kurt, Assuming that your sink is blocking, I would first make sure that it is not chained with the preceding operators. Otherwise, the same thread will output data and perform windowing/triggering. You can add disableChaining after addSink to prevent this [1]. Besides that, you probably cou

Re: remote task manager netty exception

2021-05-03 Thread Roman Khachatryan
Hi, I see that JM and TM failures are different (from TM, it's actually a warning). Could you please share the ERROR message from TM? Have you tried increasing taskmanager.network.retries [1]? [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-r

Re: remote task manager netty exception

2021-05-05 Thread Roman Khachatryan
ist_2.12-1.12.2.jar:1.12.2] > > but this looks more like a consequence than cause of exception. > > Note this seems to be pretty consistent when one of our TMs went lost. Could > it be somehow partition info isn't up to date on TM when job is restarting? > > Also not

Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread Roman Khachatryan
Hi, AFAIK, this behavior is not configurable. However, for this to happen the channel must consistently generate watermarks smaller than watermarks from ALL aligned channels (and its elements must have a smaller timestamp). I'm not sure how likely it is. Is it something you see in production? Reg

Re: Best practice for adding support for Kafka variants

2021-05-20 Thread Roman Khachatryan
Hi, Those classes will likely be deprecated in the future in favor of FLIP-27 [1][2] source and FLIP-143 [3] sink implementations and eventually removed (though it won't happen soon). You probably should take a look at the above new APIs. Either way, there is no such a recommendation AFAIK. Copie

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
Hi, Could you please share the relevant parts of your flink-conf.yaml? Regards, Roman On Thu, May 20, 2021 at 9:13 PM Milind Vaidya wrote: > > Hi > > Need to forward a few env variables to Job and Task manager. > I am running jobs in Yarn cluster > I was referring to this : Forwarding > > I als

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
ager.env.SERVICE_NAME: "test_service_name" > containerized.master.env.SERVICE_NAME: "test_service_name" > > > > > > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan wrote: >> >> Hi, >> >> Could you please share the relevant parts of your flink-conf.yaml? >>

Re: Job recovery issues with state restoration

2021-05-20 Thread Roman Khachatryan
Hi Peter, Do you experience this issue if running without local recovery or incremental checkpoints enabled? Or have you maybe compared local (on TM) and remove (on DFS) SST files? Regards, Roman On Thu, May 20, 2021 at 5:54 PM Peter Westermann wrote: > > Hello, > > > > I’ve reported issues ar

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
ion.GlobalConfiguration - Loading configuration > property: containerized.taskmanager.env.SERVICE_NAME, "hello-test" > 2021-05-20 13:34:13,743 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading configuration > property: containerized.master.env.SERVICE_NAME, "hello-test" >

Re: ES sink never receive error code

2021-05-20 Thread Roman Khachatryan
Hi, Have you tried to change bulk.flush.backoff.enable? According to the docs [1], the underlying ES BulkProcessor will retry (by default), so the provided failure handler might not be called. [1] https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#con

Re: Job recovery issues with state restoration

2021-05-25 Thread Roman Khachatryan
> checkpoints for our workload. > > The SST files are not the ones for task local recovery, those would be in a > different directory (we have configured io.tmp.dirs as /mnt/data/tmp). > > > > Thanks, > > Peter > > > > > > From: Roman Khachatryan &

Re: Is the description of taskmanager.memory.task.heap.size in the official document incorrect?

2021-06-10 Thread Roman Khachatryan
Hi Jason, I think you are right, taskmanager.memory.framework.off-heap.size is also subtracted, at least according to the source code [1]. Would you like to create a jira issue? [1] https://github.com/apache/flink/blob/4cfdc314e39974eaf089ffe0512893b01643ed2e/flink-runtime/src/main/java/org/apach

Re: How to know (in code) how many times the job restarted?

2021-06-10 Thread Roman Khachatryan
Hi Felipe, You can use getRuntimeContext().getAttemptNumber() [1] (but beware that depending on the configuration only a pipeline region can be restarted, not the whole job). But if all you want is to check whether it's a first attempt or not, you can also call context.isRestored() from initializ

Re: FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

2021-06-10 Thread Roman Khachatryan
Hi, I think you need to increase client.timeout [1]. Regarding the FileSource, it's difficult to say whether it is the reason. The logs you provided are from the client, JobManager logs would be helpful. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#client-tim

Re: How to gracefully handle job recovery failures

2021-06-10 Thread Roman Khachatryan
Hi Li, The missing file is a serialized job graph and the job recovery can't proceed without it. Unfortunately, the cluster can't proceed if one of the jobs can't recover. Regards, Roman On Thu, Jun 10, 2021 at 6:02 AM Li Peng wrote: > > Hey folks, we have a cluster with HA mode enabled, and re

Re: NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-10 Thread Roman Khachatryan
Hi ChangZhuo, Thanks for reporting, it looks like a bug. I've opened a ticket for that [1]. [1] https://issues.apache.org/jira/browse/FLINK-22966 Regards, Roman On Wed, Jun 9, 2021 at 4:07 PM ChangZhuo Chen (陳昌倬) wrote: > > Hi, > > We have NullPointerException when trying to restore from savep

Re: PyFlink: Upload resource files to Flink cluster

2021-06-10 Thread Roman Khachatryan
Hi, I think the second option is what you need. The documentation says only zip format is supported. Alternatively, you could upload the files to S3 or other DFS and access from TMs and re-upload when needed. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/depende

Re: How to gracefully handle job recovery failures

2021-06-10 Thread Roman Khachatryan
emulates what I did, so I don't need to do > manual intervention if this happens again?? > > Thanks, > Li > > On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan wrote: >> >> Hi Li, >> >> The missing file is a serialized job graph and the job recovery ca

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Roman Khachatryan
ng that I'm not able to submit the job anymore. However, > if I don't use the --pyArchives option and manually transfer the schema file > to a location on the UDF node, the job gets submitted and works as expected. > > Any reason why this might happen? > > Thanks, >

Re: How to know (in code) how many times the job restarted?

2021-06-14 Thread Roman Khachatryan
you have a simple idea please tell me >> :). This was the way that I solved >> >> Thanks >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> >> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan w

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
rely on the "isRestored()". Do you know what could be >>> wrong? I used the same implementation method of [1]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfu

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
escriptor("restarts", Long.class)); > > if (context.isRestored()) { > List restoreList = Lists.newArrayList(restartsState.get()); > if (restoreList == null || restoreList.isEmpty()) { > restartsState.add(1L); > LOG.

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Roman Khachatryan
st == null || restoreList.isEmpty()) { >> restartsState.add(0L); >> LOG.info("restarts: 0"); >> } else { >> Long max = Collections.max(restoreList); >> LOG.info("restarts: &qu

Re: How to know (in code) how many times the job restarted?

2021-06-18 Thread Roman Khachatryan
ng getRuntimeContext().getAttemptNumber() would be simpler and more reliable. Regards, Roman On Fri, Jun 18, 2021 at 6:23 PM Felipe Gutierrez wrote: > > > > On Fri, Jun 18, 2021 at 5:40 PM Roman Khachatryan wrote: >> >> I tried to run the test t

Re: How to know (in code) how many times the job restarted?

2021-06-21 Thread Roman Khachatryan
d()". Now it is counting. > thanks! > Felipe > > > On Fri, Jun 18, 2021 at 10:17 PM Roman Khachatryan wrote: >> >> > do you mean inside the processElement() method? >> I used a simple mapper with Thread.sleep before ExceptionSimulatorProcess. >

Re: Flink State Processor API Example - Java

2021-07-02 Thread Roman Khachatryan
Hi Sandeep, Could you provide the error stack trace and Flink version you are using? Regards, Roman On Fri, Jul 2, 2021 at 6:42 PM Sandeep khanzode wrote: > > Hi Guowei, > > I followed the document, but somehow, I am unable to get a working Java > example for Avro state. > > So, I tried to sim

Re: Using Flink's Kubernetes API inside Java

2021-07-02 Thread Roman Khachatryan
Hi Alexis, Have you looked at flink-on-k8s-operator [1]? It seems to have the functionality you need: https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/controllers/flinkcluster_reconciler.go#L569 I couldn't find many Flink-specific classes

Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-02 Thread Roman Khachatryan
Hi, Just to clarify, you are recovering from a savepoint, not a retained checkpoint, right? And how are you setting the operator IDs? You mentioned that with allowNonRestoredState set to false recovery fails. Does it succeed with this flag set to true? Answering your questions: Each operator stat

Re: Using Flink's Kubernetes API inside Java

2021-07-06 Thread Roman Khachatryan
templates to make it as similar as > possible to a native Deployment resource. > > Regards, > Alexis. > > From: Roman Khachatryan > Sent: Friday, July 2, 2021 9:19 PM > To: Alexis Sarda-Espinosa ; Yang Wang > > Cc: user@flink.apache.org &g

Re: Queryable State Lookup Failure

2021-07-26 Thread Roman Khachatryan
Hello, Could you check that TMs didn't fail and therefore unregistered KV states and are still running at the time of the query? Probably after changing the memory settings there is another error that is reported later than the state is unregistered. Regards, Roman On Sat, Jul 24, 2021 at 12:50

Re: ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord

2021-07-26 Thread Roman Khachatryan
Hi, It is recommended to package your application with all the dependencies into a single file [1]. And according to the kafka-connector documentation [2]: if you are using Kafka source, flink-connector-base is also required as dependency: org.apache.flink flink-connector-base VERSI

Re: k8S HA mode

2021-08-30 Thread Roman Khachatryan
Hello, Do I understand correctly that you are using native Kubernetes deployment in application mode; and the issue *only* happens if you set kubernetes-jobmanager-replicas [1] to a value greater than 1? Does it happen during deployment or at some point while running the job? Could you share Fli

Re: KafkaFetcher [] - Committing offsets to Kafka failed.

2021-08-30 Thread Roman Khachatryan
Hi, I think the preceding message that the consumer is not a member of the group suggests that there is some connectivity issue. Perhaps, heartbeats are timing out in which case you might want to increase session.timeout.ms [1] and heartbeat.interval.ms. [1] https://docs.confluent.io/platform/cur

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Roman Khachatryan
Hi, If I understand correctly, the problem is accessing local files from Flink running in docker. Have you tried mounting the local directory into the container, for example as a bind mount [1]? [1] https://docs.docker.com/storage/bind-mounts/ Regards, Roman On Mon, Aug 30, 2021 at 3:33 PM Sami

Re: checkpoints/.../shared cleanup

2021-09-06 Thread Roman Khachatryan
I tried to reproduce the issue and I see that the folder grows (because of the underlying FS) but the files under shared/ are removed. With large state, it takes quite some time though. Do you see any errors/warnings in the logs while stopping the job? Could you please share: - the commands or API

Re: Can't access Debezium metadata fields in Kafka table

2021-09-23 Thread Roman Khachatryan
Hi, could you please share the full error message? I think it should list the supported metadata columns. Do you see the same error with 'debezium-json' format instead of 'debezium-avro-confluent' ? Regards, Roman On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde wrote: > > Hi, > I'm trying

Re: pyflink keyed stream checkpoint error

2021-09-23 Thread Roman Khachatryan
Hi, Is it possible that the python process crashed or hung up? (probably performing a snapshot) Could you validate this by checking the OS logs for OOM killer messages or process status? Regards, Roman On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter wrote: > > Hi, > I'm getting an error after ena

Re: Resource leak would happen if exception thrown when flink redisson

2021-09-23 Thread Roman Khachatryan
I'd suggest to check that shutdown() in close() always completes: @Override public void close() { this.redisson.shutdown(); log.info(String.format("Shut down redisson instance in close method, RedissonRxClient shutdown is %s", redisson.isShutdown())); } maybe by logging on open and then com

Re: Kafka Partition Discovery

2021-09-24 Thread Roman Khachatryan
Hi, It seems like a useful feature, but it's probably better to have it in the Kafka consumer. There is a related KIP in progress: https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients I'd like to pull Arvid into the discussion as he might be better fa

Re: How do I verify data is written to a JDBC sink?

2021-09-27 Thread Roman Khachatryan
Hi, Do I understand correctly, that long checkpointing times are caused by slow queries to the database? If so, async querying might resolve the issue on Flink side, but the unnecessary load on DB will remain. Instead, maybe you can use CDC to stream DB changes and send messages to RabbitMQ when

Re: could not stop with a Savepoint.

2021-09-27 Thread Roman Khachatryan
Hi, The above exception may be caused by both savepoint timing out and job termination timing out. To distinguish between these two cases, could you please check the status of the savepoint and the tasks in the Flink Web UI? IIUC, after you get this exception on client, you still have the job runn

Re: How to add Flink a Flink connector to stateful functions

2021-09-27 Thread Roman Khachatryan
Hi, > Does that mean that I need to build the stateful functions java application > and afterwards the docker image? Yes, you have to rebuild the application after updating the pom, as well as its docker image. Is your concern related to synchronizing local docker images with the official repo?

Re: rpc invocation exceeds the maximum akka framesize

2021-09-28 Thread Roman Khachatryan
Hi, There could be many reasons for exceeding akka framesize, for example 1. "inlined" state that is stored in checkpoint .metadata file (rather than "data" files - see [1]) 2. broadcast state as you mentioned (though only the metadata is sent unless the data fits the above limits) 3. too many sta

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-28 Thread Roman Khachatryan
Hi, No additional ports need to be open as far as I know. Probably, $HOSTNAME is substituted for something not resolvable on TMs? Please also make sure that the following gets executed before mesos-appmaster.sh: export HADOOP_CLASSPATH=$(hadoop classpath) export MESOS_NATIVE_JAVA_LIBRARY=/path/t

Re: checkpoints/.../shared cleanup

2021-10-01 Thread Roman Khachatryan
x27;t clean whole folder (prefix) but instead delete > tracked files one by one, and maybe something bad happened during execution > (e.g. failed checkoint), which leaded to loosing track of some file(s), and > then during shutdown these files are not deleted, because Flink already

Re: Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread Roman Khachatryan
Hi ChangZhuo, Yes, restoring from a savepoint is expected to be significantly slower from a checkpoint. Regards, Roman On Mon, Oct 25, 2021 at 9:45 AM ChangZhuo Chen (陳昌倬) wrote: > > Hi, > > We found that our application savepoint restoration time (~ 40 mins) is > much slower than checkpoint re

Re: Not cleanup Kubernetes Configmaps after execution success

2021-10-25 Thread Roman Khachatryan
Hi Hua, It looks like the ConfigMap misses HA labels for some reason. Could you confirm that you are running in HA mode? Which deployment mode are you using? [1] I'm also pulling in Yan Wang who might know this area better. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dep

Re: Getting Errors in Standby Jobmanager pod during installation & after restart on k8s

2021-10-25 Thread Roman Khachatryan
Hi Amit, AFAIK, these exceptions are normal in HA mode as different JM instances are trying to acquire the lease. Regards, Roman On Mon, Oct 25, 2021 at 1:45 PM Amit Bhatia wrote: > > Hi, > > We have deployed two jobmanagers in HA mode on kubernetes using k8s configmap > solution with deployme

Re: Not cleanup Kubernetes Configmaps after execution success

2021-10-26 Thread Roman Khachatryan
Thanks for sharing this, The sequence of events the log seems strange to me: 2021-10-17 03:05:55,801 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection c1092812cfb2853a5576ffd78e346189: Stopping JobMaster for job 'rt-match_12.4.5_8d48b21a' (

Re:

2021-11-15 Thread Roman Khachatryan
Hi Uday, to unsubscribe, please send an email to user-unsubscr...@flink.apache.org Regards, Roman On Mon, Nov 15, 2021 at 12:19 PM Uday Garikipati wrote: > > Unsubscribe

Re:

2021-11-15 Thread Roman Khachatryan
Hi, to unsubscribe, please send an email to user-unsubscr...@flink.apache.org Regards, Roman On Mon, Nov 15, 2021 at 10:01 AM xm lian wrote: > > Unsubscribe

Re: PyFlink SQL window aggregation data not written out to file

2021-11-15 Thread Roman Khachatryan
Hi Guoqin, I think the problem might be related to watermarks and checkpointing: - if the file is too small, the only watermark will be the one after fully reading the file - in exactly once mode, sink waits for a checkpoint completion before committing the files Recently, there were some improve

Re: PyFlink SQL window aggregation data not written out to file

2021-11-17 Thread Roman Khachatryan
s the typical setup for such a local > test. ie. How do we inform the Flink to close the window if the input stream > hits the end. > > Thanks, > -Guoqin > > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan wrote: >> >> Hi Guoqin, >> >> I think the pro

Re: PyFlink SQL window aggregation data not written out to file

2021-11-22 Thread Roman Khachatryan
1.13 and 1.14, but it still didn't work. > > I explicitly enabled the checkpoint with: `env.enable_checkpointing(10)`. Any > other configurations I need to set? > > Thanks, > -Guoqin > > On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan wrote: >> >> Hi G

Re: stateSerializer(1.14.0) not compatible with previous stateSerializer(1.13.1)

2021-12-10 Thread Roman Khachatryan
Hi, Compatibility might depend on specific serializers, could you please share which serializers you use to access the state? Regards, Roman On Fri, Dec 10, 2021 at 3:41 AM 李诗君 wrote: > > I am trying to upgrade my flink cluster version from 1.13.1 to 1.14.0 , I did > like below steps: > > 1. s

Re: FileSource with Parquet Format - parallelism level

2021-12-10 Thread Roman Khachatryan
Hi, Yes, file source does support DoP > 1. And in general, a single file can be read in parallel after FLIP-27. However, parallel reading of a single Parquet file is currently not supported AFAIK. Maybe Arvid or Fabian could shed more light here. Regards, Roman On Thu, Dec 9, 2021 at 12:03 PM K

Re: Hybrid Source with Parquet Files from GCS + KafkaSource

2021-12-10 Thread Roman Khachatryan
Hi, Have you tried constructing a Hybrid source from a File source created with FileSource.forBulkFileFormat [1] and "gs://bucket" scheme [2] directly? [1] https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/FileSource.html#forBulkFileFormat-org.apach

Re: broadcast() without arguments

2021-12-10 Thread Roman Khachatryan
Hello, The broadcast() without arguments can be used the same way as a regular data stream, i.e. regular transformations can be applied to it. The difference is that every element will be sent to all downstream subtasks and not just one. The difference with broadcast() with arguments is that the

Re: Could not find any factory for identifier 'jdbc'

2022-01-12 Thread Roman Khachatryan
Hi, I think Chesnay's suggestion to double-check the bundle makes sense. Additionally, I'd try flink-connector-jdbc_2.12 instead of flink-connector-jdbc_2.11. Regards, Roman On Wed, Jan 12, 2022 at 12:23 PM Chesnay Schepler wrote: > > I would try double-checking whether the jdbc connector was t

Re: OutOfMemoryError: Java heap space while implmentating flink sql api

2022-01-12 Thread Roman Khachatryan
Hi Ronak, You shared a screenshot of JM. Do you mean that exception also happens on JM? (I'd rather assume TM). Could you explain the join clause: left join ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) "version" doesn't sound very selective, so maybe you end up with (almost) Carte

  1   2   >