Re: Long checkpoint duration for Kafka source operators

2021-05-20 Thread Hubert Chen
For the poor soul that stumbles upon this in the future, just increase your JM resources. I thought for sure this must have been the TM experiencing some sort of backpressure. I tried everything from enabling universal compaction to unaligned checkpoints to profiling the TM. It wasn't until I enab

Re: PyFlink DataStream union type mismatch

2021-05-20 Thread Dian Fu
Hi Wouter, 1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733 2) Regarding to your requirements, I guess you should do it as following: ``` init_stream = (operator_stream .f

Re: Issues with forwarding environment variables

2021-05-20 Thread Yangze Guo
Hi, Milind Could you help to provide the skeleton of your job code? Actually, if you implement a custom function, like Tokenizer in the WordCount example, the class member will be initialized at the client-side and be serialized to the task manager. As a result, neither the system envs nor the sys

Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 刘建刚
We meet the same problem in our company. One stream always has data. The other stream is much smaller and can be idle. Once the smaller one becomes active, its data may be dropped in this case. 张静 [via Apache Flink User Mailing List archive.] < ml+s2336050n43873...@n4.nabble.com> 于2021年5月21日周五 上午1

Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 张静
Hi, roman Thanks for reply very much. In our case, we see some data was dropped in window operator. We found the root cause by adding a temporary metric about number of aligned channels, found an active channel resumed from Idle was not took into account for some time (not alway btw). We coul

Re: ES sink never receive error code

2021-05-20 Thread Yangze Guo
> So, ES BulkProcessor retried after bulk request was partially rejected. And > eventually that request was sent successfully? That is why failure handler > was not called? If the bulk request fails after the max number of retries (bulk.flush.backoff.retries), the failure handler will still be c

behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

2021-05-20 Thread Jin Yi
hello, sorry for a long post, but this is a puzzling problem and i am enough of a flink non-expert to be unsure what details are important or not. background: i have a flink pipeline that is a series of custom "joins" for the purposes of user event "flattening" that i wrote a custom KeyedCoProces

Re: ES sink never receive error code

2021-05-20 Thread Qihua Yang
Thank you for the reply! Yes, we did config bulk.flush.backoff.enable. So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? Thanks, Qihua On Thu, May 20, 2021 at 2:22 PM Roman Khachat

Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
This is java code. I have a flink job running and it is trying to fetch this variable at run time itself. I see the properties getting reflected in the logs as already mentioned but not visible from the code. On Thu, May 20, 2021 at 1:53 PM Roman Khachatryan wrote: > > private String serviceName

Re: ES sink never receive error code

2021-05-20 Thread Roman Khachatryan
Hi, Have you tried to change bulk.flush.backoff.enable? According to the docs [1], the underlying ES BulkProcessor will retry (by default), so the provided failure handler might not be called. [1] https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#con

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
> private String serviceName = System.getenv("SERVICE_NAME"); Is it a scala object? If so, it can be initialized before any properties are set. What happens if the variable/property is read later at run time? Regards, Roman On Thu, May 20, 2021 at 10:41 PM Milind Vaidya wrote: > > here are the e

Re: Job recovery issues with state restoration

2021-05-20 Thread Roman Khachatryan
Hi Peter, Do you experience this issue if running without local recovery or incremental checkpoints enabled? Or have you maybe compared local (on TM) and remove (on DFS) SST files? Regards, Roman On Thu, May 20, 2021 at 5:54 PM Peter Westermann wrote: > > Hello, > > > > I’ve reported issues ar

Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
here are the entries from taskmanager logs 2021-05-20 13:34:13,739 INFO org.apache.flink.configuration. GlobalConfiguration - Loading configuration property: env.java.opts.taskmanager, "-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" 2021-05-20 13:34:1

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
Thanks, it should work. I've created a ticket to track the issue [1]. Could you please specify Flink and Yarn versions you are using? You can also use properties (which don't depend on Yarn integration), for example like this: In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=... In th

ES sink never receive error code

2021-05-20 Thread Qihua Yang
Hello, We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. F

Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
Hi Roman, I have added following lines to conf/flink-conf.yaml containerized.taskmanager.env.SERVICE_NAME: "test_service_name" containerized.master.env.SERVICE_NAME: "test_service_name" On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan wrote: > Hi, > > Could you please share the relevant

Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
Hi, Could you please share the relevant parts of your flink-conf.yaml? Regards, Roman On Thu, May 20, 2021 at 9:13 PM Milind Vaidya wrote: > > Hi > > Need to forward a few env variables to Job and Task manager. > I am running jobs in Yarn cluster > I was referring to this : Forwarding > > I als

Re: Best practice for adding support for Kafka variants

2021-05-20 Thread Roman Khachatryan
Hi, Those classes will likely be deprecated in the future in favor of FLIP-27 [1][2] source and FLIP-143 [3] sink implementations and eventually removed (though it won't happen soon). You probably should take a look at the above new APIs. Either way, there is no such a recommendation AFAIK. Copie

Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
Hi Need to forward a few env variables to Job and Task manager. I am running jobs in Yarn cluster I was referring to this : Forwarding I also found Stack Overflow

Best practice for adding support for Kafka variants

2021-05-20 Thread deepthi Sridharan
Hi, We have an internal version of Open source Kafka consumer and producer that we use and are working on adding that as a source and sink for flink. It seems like the easiest way to add the consumer as source would be to override the FlinkKafkaConsumer class's createFetcher

Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread Roman Khachatryan
Hi, AFAIK, this behavior is not configurable. However, for this to happen the channel must consistently generate watermarks smaller than watermarks from ALL aligned channels (and its elements must have a smaller timestamp). I'm not sure how likely it is. Is it something you see in production? Reg

Re: Parallelism in Production: Best Practices

2021-05-20 Thread Yaroslav Tkachenko
Hi Jan, thanks for sharing this! Just wanted to confirm: this approach works because of the task slot sharing feature in Flink, doesn't it? On Thu, May 20, 2021 at 1:12 AM Jan Brusch wrote: > > Hi Yaroslav, > > here's a fourth option that we usually use: We set the default > parallelism once wh

PyFlink DataStream union type mismatch

2021-05-20 Thread Wouter Zorgdrager
Dear all, I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this: init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0

Job recovery issues with state restoration

2021-05-20 Thread Peter Westermann
Hello, I’ve reported issues around checkpoint recovery in case of a job failure due to zookeeper connection loss in the past. I am still seeing issues occasionally. This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, incremental checkpoints, and task-local recovery enabled.

Re: [Statefun] Truncated Messages in Python workers

2021-05-20 Thread Stephan Ewen
Thanks for reporting this, it looks indeed like a potential bug. I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729 Could you share (here ot in Jira) what the stack on the Python Worker side is (for example which HTTP server)? Do you know if the message truncation happens

Re: Savepoint/checkpoint confusion

2021-05-20 Thread Igor Basov
Hey Robert, Thanks for the answer! But then I guess the only difference between savepoints and checkpoints is that checkpoints are structurally state dependent and can be incremental, but otherwise they are functionally equivalent. So functionally savepoint can be considered a full checkpoint which

Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 张静
Hi community, Now after a channel become active from idle, the watermark on this channel would not be took into account when align watermarks util it generates a watermark equals to or bigger than last emitted watermark. It makes sense because it could prevent the newly active task resumed from idl

Issue with using siddhi extension function with flink

2021-05-20 Thread Dipanjan Mazumder
Hi ,    i am trying to integrate siddhi with flink while trying to use siddhi extension function on deploying the job in flink cluster it is not able to find those libraries at run time , so i had to explicitly put those libraries to the /opt/flink/lib folder for the jobmanager and taskmanager

Re: Unable to deserialize Avro data using Pyflink

2021-05-20 Thread Zerah J
Hi Dian, Thanks for your support. I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed. I can print the datast

Kafka dynamic topic for Sink in SQL

2021-05-20 Thread Benoît Paris
Hi all! I'm looking for a way to write to different Kafka topics based on some column value in SQL. I think it's possible with Java, using KafkaSerializationSchema, and ProducerRecord(topic, ...), but I was wondering if I could somewhat access that feature in SQL. I'm also trying to evaluate the

Re: Side outputs PyFlink

2021-05-20 Thread Dian Fu
Hi Wouter, You are right that side out is still not supported in PyFlink. It’s definitely one of the features we want to support in the next release. For now, the workaround you mentioned is also what I have in my head. Personally I think if the performance of the filter is good enough, it wil

Re: Issue reading from S3

2021-05-20 Thread Yun Gao
Hi Angelo, I tried the fail case provied with a similar one: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment

Re: Savepoint/checkpoint confusion

2021-05-20 Thread Robert Metzger
Hey Igor, 1) yes, reactive mode indeed does the same. 2) No, HA mode is only storing some metadata in ZK about the leadership and latest checkpoints, but the checkpoints itself are the same. They should be usable for a changed job graph (if the state matches the operators by setting the UUIDs [1]

Side outputs PyFlink

2021-05-20 Thread Wouter Zorgdrager
Dear Flink community, First of all, I'm very excited about the new 1.13 release. Among other features, I'm particularly excited about the support of stateful operations in Python. I think it will make the wonders of stream processing and the power of Flink accessible to more developers. I'm curre

Re: Prometheus Reporter Enhancement

2021-05-20 Thread Chesnay Schepler
There is no plan to generally exclude label keys from the metric identifier/logical scope. They ensure that the label set for a given identifier/scope is unique, i.e., you can't have 2 metrics called "numRecordsIn" with different label sets. Changing this would also break all existing setups, s

Re: Parallelism in Production: Best Practices

2021-05-20 Thread Jan Brusch
Hi Yaroslav, here's a fourth option that we usually use: We set the default parallelism once when we initially deploy the app (maybe change it a few times in the beginning). From that point on rescale by either resizing the TaskManager-Nodes or redistributing the parallelism over more / less