Unify error handler and late window record output for SQL api

2020-10-29 Thread Yi Tang
Hi,
I'm looking for a way to handle potential errors in job submitted with SQL
API, but unfortunately nothing found.
Handle errors manually in SQL API is hard, I think. Is there a way to
handle all errors and send them to a SideOutput to avoid task failure.

Also one can put late records into a SideOutput in streaming API, looks
like there's no equivalent in SQL API.

Thanks.


Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori,

It looks like Flink indeed uses more memory than expected. I assume the
first item with PID 20331 is the flink process, right?

It would be helpful if you can briefly introduce your workload.
- What kind of workload are you running? Streaming or batch?
- Do you use RocksDB state backend?
- Any UDFs or 3rd party dependencies that might allocate significant native
memory?

Moreover, if the metrics shows only 20% heap usages, I would suggest
configuring less `task.heap.size`, leaving more memory to off-heap. The
reduced heap size does not necessarily all go to the managed memory. You
can also try increasing the `jvm-overhead`, simply to leave more native
memory in the container in case there are other other significant native
memory usages.

Thank you~

Xintong Song



On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:

> Hi Xintong,
>
> See here:
>
> # Top memory users
> ps auxwww --sort -rss | head -10
> USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
> yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
> root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
> hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
> yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
> root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
> root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
> /usr/lib/systemd/systemd-journald
> root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
> /usr/bin/amazon-ssm-agent
> root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps auxwww
> --sort -rss
> root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
> /usr/sbin/CROND -n
>
> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song 
> wrote:
>
>> Hi Ori,
>>
>> The error message suggests that there's not enough physical memory on the
>> machine to satisfy the allocation. This does not necessarily mean a managed
>> memory leak. Managed memory leak is only one of the possibilities. There
>> are other potential reasons, e.g., another process/container on the machine
>> used more memory than expected, Yarn NM is not configured with enough
>> memory reserved for the system processes, etc.
>>
>> I would suggest to first look into the machine memory usages, see whether
>> the Flink process indeed uses more memory than expected. This could be
>> achieved via:
>> - Run the `top` command
>> - Look into the `/proc/meminfo` file
>> - Any container memory usage metrics that are available to your Yarn
>> cluster
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:
>>
>>> After the job is running for 10 days in production, TaskManagers start
>>> failing with:
>>>
>>> Connection unexpectedly closed by remote task manager
>>>
>>> Looking in the machine logs, I can see the following error:
>>>
>>> = Java processes for user hadoop =
>>> OpenJDK 64-Bit Server VM warning: INFO:
>>> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
>>> allocate memory' (err
>>> #
>>> # There is insufficient memory for the Java Runtime Environment to
>>> continue.
>>> # Native memory allocation (mmap) failed to map 1006567424 bytes for
>>> committing reserved memory.
>>> # An error report file with more information is saved as:
>>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
>>> === End java processes for user hadoop ===
>>>
>>> In addition, the metrics for the TaskManager show very low Heap memory
>>> consumption (20% of Xmx).
>>>
>>> Hence, I suspect there is a memory leak in the TaskManager's Managed
>>> Memory.
>>>
>>> This my TaskManager's memory detail:
>>> flink process 112g
>>> framework.heap.size 0.2g
>>> task.heap.size 50g
>>> managed.size 54g
>>> framework.off-heap.size 0.5g
>>> task.off-heap.size 1g
>>> network 2g
>>> XX:MaxMetaspaceSize 1g
>>>
>>> As you can see, the managed memory is 54g, so it's already high (my
>>> managed.fraction is set to 0.5).
>>>
>>> I'm running Flink 1.10. Full job details attached.
>>>
>>> Can someone advise what would cause a managed memory leak?
>>>
>>>
>>>


Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yun Gao
Hi Yi,

  Sorry I'm might not be experts for SQL, as a whole, since SQL should be a 
high-level API, the users might have less control for the jobs:
  1. Unfortunately we do not have the API to catch all the errors. I think 
even with DataStream, we also do not provide API to catch  the runtime 
exception such as the one related to network. Could you also explain a bit on 
why this functionality is wanted? 
  2. SQL API currently also does not provide API to sideout the late 
records, since the standard SQL does not provide the corresponding grammar, it 
would be a bit complex to provide the corresponding functionalities. 

Best,
 Yun


 --Original Mail --
Sender:Yi Tang 
Send Date:Thu Oct 29 15:08:30 2020
Recipients:Flink ML 
Subject:Unify error handler and late window record output for SQL api

Hi,
I'm looking for a way to handle potential errors in job submitted with SQL API, 
but unfortunately nothing found.
Handle errors manually in SQL API is hard, I think. Is there a way to handle 
all errors and send them to a SideOutput to avoid task failure.

Also one can put late records into a SideOutput in streaming API, looks like 
there's no equivalent in SQL API.

Thanks.

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi,

PID 20331 is indeed the Flink process, specifically the TaskManager process.

- Workload is a streaming workload reading from Kafka and writing to S3
using a custom Sink
- RockDB state backend is used with default settings
- My external dependencies are:
-- logback
-- jackson
-- flatbuffers
-- jaxb-api
-- scala-java8-compat
-- apache commons-io
-- apache commons-compress
-- software.amazon.awssdk s3
- What do you mean by UDFs? I've implemented several operators like
KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.

We use a SessionWindow with 30 minutes of gap, and a watermark with 10
minutes delay.

We did confirm we have some keys in our job which keep receiving records
indefinitely, but I'm not sure why it would cause a managed memory leak,
since this should be flushed to RocksDB and free the memory used. We have a
guard against this, where we keep the overall size of all the records for
each key, and when it reaches 300mb, we don't move the records downstream,
which causes them to create a session and go through the sink.

About what you suggested - I kind of did this by increasing the managed
memory fraction to 0.5. And it did postpone the occurrence of the problem
(meaning, the TMs started crashing after 10 days instead of 7 days). It
looks like anything I'll do on that front will only postpone the problem
but not solve it.

I am attaching the full job configuration.



On Thu, Oct 29, 2020 at 10:09 AM Xintong Song  wrote:

> Hi Ori,
>
> It looks like Flink indeed uses more memory than expected. I assume the
> first item with PID 20331 is the flink process, right?
>
> It would be helpful if you can briefly introduce your workload.
> - What kind of workload are you running? Streaming or batch?
> - Do you use RocksDB state backend?
> - Any UDFs or 3rd party dependencies that might allocate significant
> native memory?
>
> Moreover, if the metrics shows only 20% heap usages, I would suggest
> configuring less `task.heap.size`, leaving more memory to off-heap. The
> reduced heap size does not necessarily all go to the managed memory. You
> can also try increasing the `jvm-overhead`, simply to leave more native
> memory in the container in case there are other other significant native
> memory usages.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:
>
>> Hi Xintong,
>>
>> See here:
>>
>> # Top memory users
>> ps auxwww --sort -rss | head -10
>> USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>> yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
>> root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>> hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>> yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
>> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
>> root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
>> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
>> root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
>> /usr/lib/systemd/systemd-journald
>> root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
>> /usr/bin/amazon-ssm-agent
>> root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps
>> auxwww --sort -rss
>> root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
>> /usr/sbin/CROND -n
>>
>> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> The error message suggests that there's not enough physical memory on
>>> the machine to satisfy the allocation. This does not necessarily mean a
>>> managed memory leak. Managed memory leak is only one of the possibilities.
>>> There are other potential reasons, e.g., another process/container on the
>>> machine used more memory than expected, Yarn NM is not configured with
>>> enough memory reserved for the system processes, etc.
>>>
>>> I would suggest to first look into the machine memory usages, see
>>> whether the Flink process indeed uses more memory than expected. This could
>>> be achieved via:
>>> - Run the `top` command
>>> - Look into the `/proc/meminfo` file
>>> - Any container memory usage metrics that are available to your Yarn
>>> cluster
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:
>>>
 After the job is running for 10 days in production, TaskManagers start
 failing with:

 Connection unexpectedly closed by remote task manager

 Looking in the machine logs, I can see the following error:

 = Java processes for user hadoop =
 OpenJDK 64-Bit Server VM warning: INFO:
 os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error=

Re: Rich Function Thread Safety

2020-10-29 Thread Igal Shilman
Hi Lian,

Good to hear that you are learning about StateFun, and I'd be happy to
answer any of your questions while doing so :-)
Perhaps in the future it would be best if you start a new email thread, so
that it would be easier to spot your question.

The following is completely thread safe:

final int seen = count.getOrDefault(0);
count.set(seen + 1);

The simple reason is that the functions are invoked one by one on a single
OS thread, and different OS threads
do not share function instances between them. In addition each OS thread
would own a chunk of keys that only it can invoke.

2. Is there any scenario that the developers need to worry about
> process/thread safety when using state?

Few things here:

* do not share mutable static variables without synchronization.

* try to minimize/avoid doing long blocking calls. Use asynchronous
API if applicable.


3. can I consider stateful functions as Flink operators so that all
> operator related theories can be applied to stateful functions?
>

Absolutely yes. StateFun is built on-top of the DataStream API + some
internal bits.


> 4. Similarly, can we apply all theories of DataStream state to stateFun's
> state?

I'm not sure what do you mean by that, but at large yes. The main
difference would be that

We don't support state evolution with arbitrary state types, but
strictly require Protocol Buffers for that.


Good luck,
Igal.

On Sun, Oct 25, 2020 at 7:43 PM Lian Jiang  wrote:

> Hi,
>
> I am learning
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/getting-started/java_walkthrough.html
> and wondering if the invoke function is thread safe for:
>
> final int seen = count.getOrDefault(0);count.set(seen + 1);
>
> From 
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/logical.html
>
> "When an application starts, each parallel worker of the framework will 
> create one physical object per function type."
>
> It sounds like one function can be invoked by multiple workers at the same 
> time. The tutorial example
>
> indicates that the persistedValue can be process safe (cross multiple 
> workers) and thread safe (inside
>
> a worker, e.g. timer callback).
>
>
> Could you please add some clarification on the questions below?
>
> 1. What's the design (briefly) for persisted state process/thread safety?
> 2. Is there any scenario that the developers need to worry about 
> process/thread safety when using state?
>
> 3. can I consider stateful functions as Flink operators so that all operator 
> related theories can be applied to stateful functions?
>
> 4. Similarly, can we apply all theories of DataStream state to stateFun's 
> state?
>
> Appreciate very much!
>
>
> Thanks
>
> Lian
>
>
> On Sun, May 10, 2020 at 9:33 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> As others have mentioned already, it is true that method calls on
>> operators
>> (e.g. processing events and snapshotting state) will not concurrently
>> happen.
>>
>> As for your findings in reading through the documentation, that might be a
>> hint that we could add a bit more explanation mentioning this.
>> Could you suggest where you'd probably expect to see this being mentioned,
>> based on your readt-hrough?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> Create your own email signature
> 
>


Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yi Tang
Hi Yun
Thanks for your quick reply.

To be clear, It's not essential to implement these features into the SQL
statement. And precisely because of the limitations of SQL, we want these
features happen.

1.
Yeah, I think the stream api also has not similar api. We want it because
sometimes we want to record the error and record itself to analyse instead
of leave it crash the task. Especially in Job by SQL , we can hardly handle
the errors occur in query .
2.
Almost same reason, and also not essential to present the operation in the
statement.

Thanks again.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink checkpointing state

2020-10-29 Thread Yun Tang
Hi

Added Yang Wang who mainly develops this feature, I think he could provide more 
information.

Best
Yun Tang

From: Boris Lublinsky 
Sent: Tuesday, October 27, 2020 22:57
To: Yun Tang 
Cc: user 
Subject: Re: Flink checkpointing state

Thanks Yun,
This refers to Flip144 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
Flip contains 2 parts - leader election and HA information persistence and 
offers two options.
Can you tell us what exactly will be part of 1.12.
We would be happy with second option for now, if its faster to implement.


On Oct 27, 2020, at 1:11 AM, Yun Tang 
mailto:myas...@live.com>> wrote:

Hi Boris

Please refer to FLINK-12884[1] for current progress of native HA support of k8s 
which targets for release-1.12.

[1] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang


From: Boris Lublinsky 
mailto:boris.lublin...@lightbend.com>>
Sent: Tuesday, October 27, 2020 2:56
To: user mailto:user@flink.apache.org>>
Subject: Flink checkpointing state

This is from Flink 1.8:

"Job Manager keeps some state related to checkpointing in it’s memory. This 
state would be lost on Job Manager crashes, which is why this state is 
persisted in ZooKeeper. This means that even though there is no real need for 
the leader election and -discovery part of Flink’s HA mode (as is this handled 
natively by Kubernetes), it still needs to be enabled just for storing the 
checkpoint state.”

Was it ever fixed in Flink 1.10 or 1.11? If running Flink on K8, without HA, 
there is no Zookeeper. And if the above is still the case, then checkpointing 
will never pick up the right one



Re: Running flink in a Local Execution Environment for Production Workloads

2020-10-29 Thread Matthias Pohl
Hi Joseph,
thanks for reaching out to us. There shouldn't be any downsides other than
the one you already mentioned as far as I know.

Best,
Matthias

On Fri, Oct 23, 2020 at 1:27 PM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> I plan to run flink jobs as docker containers in a AWS Elastic Container
> Service. I will have checkpointing enabled where state is stored in a s3
> bucket. Each deployment will run in a per-job mode.  Are there any
> non-obvious downsides to running these jobs with a local execution
> environment so that the deployment turns into deploying a single java
> application?
>
>
>
> The obvious downside is that you don’t get any horizontal scalability.
> That’s a given and I’d have to scale up not out in this mode. I’d like to
> discover if there are any other negatives with this approach.
>
>
>
> Thanks,
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Ruben Laguna
I made this question on [Stackoverflow][1] but I'm cross posting here.


Are double quoted identifiers allowed in Flink SQL? [Calcite
documentation says to use double quoted
identifiers](https://calcite.apache.org/docs/reference.html#identifiers)
but they don't seem to work (see below). On the other hand I just
found

> Identifiers follow SQL requirements which means that they can be escaped with 
> a backtick character (`).

on the [Table API & SQL > Concepts & Common API > Expanding Table
identifiers][2]

So I guess this means that [Flink SQL][3] is not really just Calcite
SQL plus extras, as I assumed.

---

When I tried the following in Flink I got an `SQL parse failed .
Encountered "\"" at line 1 ,column 21.` complaining about the `"`
(double quote)

   CREATE TABLE table1("ts" TIMESTAMP) WITH(...)

The full exception is

org.apache.flink.table.api.SqlParserException: SQL parse failed.
Encountered "\"" at line 1, column 21.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
 ...
 ...
 ...
 ...
 ...
 ...


at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:658)
at com.rubenlaguna.BatchJobTest.testBatchJob(BatchJobTest.java:22)
 .






[1]: https://stackoverflow.com/q/64588892/90580
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#expanding-table-identifiers
[3]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/index.html
-- 
/Rubén


Re: ValidationException using DataTypeHint in Scalar Function

2020-10-29 Thread Dawid Wysakowicz
Sorry for a late reply.

Could you share a complete, reproducible example? I am mostly interested
in where do you get the input RAW('java.util.Map', '...') type that you
are passing into your UDF.

Raw types are equal/equivalent only if both the class and the serializer
are equal.

A side note: Have you tried using the MAP type instead of
RAW('java.util.Map', '...')? Why did you decide to use a RAW type in
your case?

Best,

Dawid

On 28/10/2020 00:23, Steve Whelan wrote:
> Hi Dawid,
>
> I added `/bridgedTo = Map.class/` as you suggested and got a slightly
> different exception. I also tried passing a rawSerializer (an
> implementation similar to MapSerializer[1] with String type key and
> value) but got the same exception as without it. I am using Flink
> v1.11 for reference.
>
>
> @FunctionHint(
>       input = {
>               @DataTypeHint(value="RAW", bridgedTo=Map.class,
> rawSerializer=MyMapSerializer.class),
>               @DataTypeHint("STRING")
>       },
>       output = @DataTypeHint("STRING")
> )
> public static String eval(final Object map, final String key)
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> input arguments. Expected signatures are:
> MAP_VALUE(RAW('java.util.Map', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 49 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.util.Map', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 50 more*
> *
>
>
> [1] 
> https://github.com/apache/flink/blob/release-1.11.0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
>
> On Tue, Oct 27, 2020 at 6:13 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hey Steve,
>
> You should be able to do via the bridgedTo parameter. You can
> additionally specify a serializer you want to use via
> rawSerializer parameter:
>
>         @FunctionHint(
>                 input = {
>                         @DataTypeHint(value = "RAW", bridgedTo =
> Map.class[, rawSerializer = ... ]),
>                         @DataTypeHint("STRING")},
>                 output = @DataTypeHint("STRING")
>         )
>         public static String eval(final Object map, final String key)
>
> Best,
>
> Dawid
>
> On 26/10/2020 16:10, Steve Whelan wrote:
>> Hi,
>>
>> I have a column of type *RAW('java.util.Map', ?)* that I want to
>> pass to a scalar function UDF. I'm using DataTypeHints but
>> hitting an exception. What would be the proper DataTypeHint and
>> data type param to achieve this?
>>
>>   @FunctionHint(
>>           input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>>           output = @DataTypeHint("STRING")
>>   )
>>   public static String eval(final Object map, final String key) {
>>     // business logic
>>   }
>>
>>
>> *Exception:*
>> *
>> *
>> Caused by: org.apache.flink.table.api.ValidationException:
>> Invalid input arguments. Expected signatures are:
>> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
>> at
>> 
>> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
>> at
>> 
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
>> at
>> 
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>> ... 50 more
>> Caused by: org.apache.flink.table.api.ValidationException:
>> Invalid argument type at position 0. Data type
>> RAW('java.lang.Object', '...') expected but RAW('java.util.Map',
>> ?) passed.
>> at
>> 
>> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
>> at
>> 
>> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
>> at
>> 
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
>>  

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori,

RocksDB also uses managed memory. If the memory overuse indeed comes from
RocksDB, then increasing managed memory fraction will not help. RocksDB
will try to use as many memory as the configured managed memory size.
Therefore increasing managed memory fraction also makes RocksDB try to use
more memory. That is why I suggested increasing `jvm-overhead` instead.

Please also make sure the configuration option
`state.backend.rocksdb.memory.managed` is either not explicitly configured,
or configured to `true`.

In addition, I noticed that you are using Flink 1.10.0. You might want to
upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.

Thank you~

Xintong Song



On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:

> Hi,
>
> PID 20331 is indeed the Flink process, specifically the TaskManager
> process.
>
> - Workload is a streaming workload reading from Kafka and writing to S3
> using a custom Sink
> - RockDB state backend is used with default settings
> - My external dependencies are:
> -- logback
> -- jackson
> -- flatbuffers
> -- jaxb-api
> -- scala-java8-compat
> -- apache commons-io
> -- apache commons-compress
> -- software.amazon.awssdk s3
> - What do you mean by UDFs? I've implemented several operators like
> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>
> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
> minutes delay.
>
> We did confirm we have some keys in our job which keep receiving records
> indefinitely, but I'm not sure why it would cause a managed memory leak,
> since this should be flushed to RocksDB and free the memory used. We have a
> guard against this, where we keep the overall size of all the records for
> each key, and when it reaches 300mb, we don't move the records downstream,
> which causes them to create a session and go through the sink.
>
> About what you suggested - I kind of did this by increasing the managed
> memory fraction to 0.5. And it did postpone the occurrence of the problem
> (meaning, the TMs started crashing after 10 days instead of 7 days). It
> looks like anything I'll do on that front will only postpone the problem
> but not solve it.
>
> I am attaching the full job configuration.
>
>
>
> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song 
> wrote:
>
>> Hi Ori,
>>
>> It looks like Flink indeed uses more memory than expected. I assume the
>> first item with PID 20331 is the flink process, right?
>>
>> It would be helpful if you can briefly introduce your workload.
>> - What kind of workload are you running? Streaming or batch?
>> - Do you use RocksDB state backend?
>> - Any UDFs or 3rd party dependencies that might allocate significant
>> native memory?
>>
>> Moreover, if the metrics shows only 20% heap usages, I would suggest
>> configuring less `task.heap.size`, leaving more memory to off-heap. The
>> reduced heap size does not necessarily all go to the managed memory. You
>> can also try increasing the `jvm-overhead`, simply to leave more native
>> memory in the container in case there are other other significant native
>> memory usages.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:
>>
>>> Hi Xintong,
>>>
>>> See here:
>>>
>>> # Top memory users
>>> ps auxwww --sort -rss | head -10
>>> USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>>> yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
>>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
>>> root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>>> hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>>> yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
>>> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
>>> root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
>>> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
>>> root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
>>> /usr/lib/systemd/systemd-journald
>>> root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
>>> /usr/bin/amazon-ssm-agent
>>> root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps
>>> auxwww --sort -rss
>>> root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
>>> /usr/sbin/CROND -n
>>>
>>> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song 
>>> wrote:
>>>
 Hi Ori,

 The error message suggests that there's not enough physical memory on
 the machine to satisfy the allocation. This does not necessarily mean a
 managed memory leak. Managed memory leak is only one of the possibilities.
 There are other potential reasons, e.g., another process/container on the
 machine used more memory than expected, Yarn NM is not configured with
 enough memory reserve

Re: quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Danny Chan
Yes, Flink SQL use the back quote ` as the quote character, for your SQL,
it should be:

CREATE TABLE table1(`ts` TIMESTAMP) WITH(...)


Ruben Laguna  于2020年10月29日周四 下午6:32写道:

> I made this question on [Stackoverflow][1] but I'm cross posting here.
>
>
> Are double quoted identifiers allowed in Flink SQL? [Calcite
> documentation says to use double quoted
> identifiers](https://calcite.apache.org/docs/reference.html#identifiers)
> but they don't seem to work (see below). On the other hand I just
> found
>
> > Identifiers follow SQL requirements which means that they can be escaped
> with a backtick character (`).
>
> on the [Table API & SQL > Concepts & Common API > Expanding Table
> identifiers][2]
>
> So I guess this means that [Flink SQL][3] is not really just Calcite
> SQL plus extras, as I assumed.
>
> ---
>
> When I tried the following in Flink I got an `SQL parse failed .
> Encountered "\"" at line 1 ,column 21.` complaining about the `"`
> (double quote)
>
>CREATE TABLE table1("ts" TIMESTAMP) WITH(...)
>
> The full exception is
>
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "\"" at line 1, column 21.
> Was expecting one of:
> "CONSTRAINT" ...
> "PRIMARY" ...
> "UNIQUE" ...
> "WATERMARK" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:658)
> at com.rubenlaguna.BatchJobTest.testBatchJob(BatchJobTest.java:22)
>  .
>
>
>
>
>
>
> [1]: https://stackoverflow.com/q/64588892/90580
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#expanding-table-identifiers
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/index.html
> --
> /Rubén
>


Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi Xintong,

Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or
1.11.0.

About the overhead - turns out I already configured
taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
Should I increase it further?

state.backend.rocksdb.memory.managed is already not explicitly configured.

Is there anything else I can do?



On Thu, Oct 29, 2020 at 1:24 PM Xintong Song  wrote:

> Hi Ori,
>
> RocksDB also uses managed memory. If the memory overuse indeed comes from
> RocksDB, then increasing managed memory fraction will not help. RocksDB
> will try to use as many memory as the configured managed memory size.
> Therefore increasing managed memory fraction also makes RocksDB try to use
> more memory. That is why I suggested increasing `jvm-overhead` instead.
>
> Please also make sure the configuration option
> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
> or configured to `true`.
>
> In addition, I noticed that you are using Flink 1.10.0. You might want to
> upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> PID 20331 is indeed the Flink process, specifically the TaskManager
>> process.
>>
>> - Workload is a streaming workload reading from Kafka and writing to S3
>> using a custom Sink
>> - RockDB state backend is used with default settings
>> - My external dependencies are:
>> -- logback
>> -- jackson
>> -- flatbuffers
>> -- jaxb-api
>> -- scala-java8-compat
>> -- apache commons-io
>> -- apache commons-compress
>> -- software.amazon.awssdk s3
>> - What do you mean by UDFs? I've implemented several operators like
>> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>>
>> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
>> minutes delay.
>>
>> We did confirm we have some keys in our job which keep receiving records
>> indefinitely, but I'm not sure why it would cause a managed memory leak,
>> since this should be flushed to RocksDB and free the memory used. We have a
>> guard against this, where we keep the overall size of all the records for
>> each key, and when it reaches 300mb, we don't move the records downstream,
>> which causes them to create a session and go through the sink.
>>
>> About what you suggested - I kind of did this by increasing the managed
>> memory fraction to 0.5. And it did postpone the occurrence of the problem
>> (meaning, the TMs started crashing after 10 days instead of 7 days). It
>> looks like anything I'll do on that front will only postpone the problem
>> but not solve it.
>>
>> I am attaching the full job configuration.
>>
>>
>>
>> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> It looks like Flink indeed uses more memory than expected. I assume the
>>> first item with PID 20331 is the flink process, right?
>>>
>>> It would be helpful if you can briefly introduce your workload.
>>> - What kind of workload are you running? Streaming or batch?
>>> - Do you use RocksDB state backend?
>>> - Any UDFs or 3rd party dependencies that might allocate significant
>>> native memory?
>>>
>>> Moreover, if the metrics shows only 20% heap usages, I would suggest
>>> configuring less `task.heap.size`, leaving more memory to off-heap. The
>>> reduced heap size does not necessarily all go to the managed memory. You
>>> can also try increasing the `jvm-overhead`, simply to leave more native
>>> memory in the container in case there are other other significant native
>>> memory usages.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:
>>>
 Hi Xintong,

 See here:

 # Top memory users
 ps auxwww --sort -rss | head -10
 USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
 yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
 /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
 root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
 /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
 hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
 /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
 yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
 /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
 root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
 /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
 root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
 /usr/lib/systemd/systemd-journald
 root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
 /usr/bin/amazon-ssm-agent
 root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps
 auxwww --sort -rss
 root  6532  0.0  0.0 183124  3592 ? 

Flink kafka - Message Prioritization

2020-10-29 Thread Vignesh Ramesh
Hi,

I have a flink pipeline which reads from a kafka topic does a map
operation(builds an ElasticSearch model) and sinks it to Elasticsearch

*Pipeline-1:*

Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

Now i want some messages to be prioritized(processed quickly not
necessarily in any order). I am okay in creating a new topic and placing
the priority messages in it (or) do a partition based buckets(Ex:
https://github.com/riferrei/bucket-priority-pattern i don't think it's
possible in flink kafka connector since partition assignment is present
inside FlinkKafkaConsumerBase ).

*I tried the below solution:*

I created another topic (topic2 in which i placed the priority messages)
and with it a new Flink pipeline

*Pipeline-2:*

Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

But the problem is, I want to consume topic2 as soon as possible. I can
have a delay/slowness in topic1 because of that. If there is no message in
topic2 then topic1 should be given more priority. But in the above case
both the pipelines are getting processed equally. Increasing the
parallelism of pipeline-2 to a big number doesn't help as when there is no
message in topic2 then topic1 is still very slow(parallelism of topic 2 is
wasted).

How can i achieve this using Flink Kafka connector? Is it possible to
achieve it in any other way?


Regards,

Vignesh


Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-10-29 Thread Becket Qin
Hi John,

The log message you saw from Kafka consumer simply means the consumer was
disconnected from the broker that FetchRequest was supposed to be sent to.
The disconnection can happen in many cases, such as broker down, network
glitches, etc. The KafkaConsumer will just reconnect and retry sending that
FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
the downstream might see duplicates in case of Flink failover or occasional
retry in the KafkaProducer of the Kafka sink.

Thanks,

Jiangjie (Becket) Qin

On Thu, Oct 22, 2020 at 11:38 PM John Smith  wrote:

> Any thoughts this doesn't seem to create duplicates all the time or maybe
> it's unrelated as we are still seeing the message and there is no
> duplicates...
>
> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, 
> wrote:
>
>> And yes my downstream is handling the duplicates in an idempotent way so
>> we are good on that point. But just curious what the behaviour is on the
>> source consumer when that error happens.
>>
>> On Wed, 21 Oct 2020 at 12:04, John Smith  wrote:
>>
>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21
>>> 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>> Consumer clientId=consumer-2, groupId=xx-import] Error sending
>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>> org.apache.kafka.common.errors.DisconnectException.
>>>
>>> Obviously it looks like the consumer is getting disconnected and from
>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>> possibly version mismatch between client and brokers. That's fine I can
>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>> what happens in terms of the source and the sink. It looks let we get
>>> duplicates on the sink and I'm guessing it's because the consumer is
>>> failing and at that point Flink stays on that checkpoint until it can
>>> reconnect and process that offset and hence the duplicates downstream?
>>>
>>


Re: Feature request: Removing state from operators

2020-10-29 Thread Congxian Qiu
Hi Peter
 Can applyToAllKeys[1] in KeyedStateBackend help you here? but
currently, this is not exposed to users now.

[1]
https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65

Best,
Congxian


Robert Metzger  于2020年10月27日周二 下午5:51写道:

> Hi Peter,
>
> I'm adding two committers to this thread who can help answering your
> question.
>
> On Mon, Oct 26, 2020 at 3:22 PM Peter Westermann <
> no.westerm...@genesys.com> wrote:
>
>> We use the feature for removing stateful operators via the
>> *allowNonRestoredState* relatively often and it works great. However,
>> there doesn’t seem to be anything like that for removing state from an
>> existing operator (that we want to keep).
>>
>> Say my operator defines a * MapState* and a *ValueState*. Later on, the
>> *ValueState* becomes obsolete. In this case, we can remove the actual
>> data for each key by clearing it out but the state itself is still
>> referenced in savepoints even if it’s not referenced in code anymore – that
>> e.g. means one cannot remove any class that was previously used in state.
>>
>> Would it be possible to add support for completely removing state from an
>> operator if it’s no longer referenced in code and *allowNonRestoredState*
>> is set? (Or to add an explicit “drop this state option” in KeyedStateStore
>> and OperatorStateStore?)
>>
>>
>>
>> Thanks,
>>
>> Peter
>>
>>
>>
>


Re: Feature request: Removing state from operators

2020-10-29 Thread Peter Westermann
Does that actually allow removing a state completely (vs. just modifying the 
values stored in state)?

Ideally, we would want to just interact with state via KeyedStateStore. Maybe 
it would be possible to add a couple methods there, e.g. like this:
// List all pre-existing states
 List> listStates();
// Completely remove a state
 void dropState(StateDescriptor stateDescriptor);


Thanks,
Peter



From: Congxian Qiu 
Date: Thursday, October 29, 2020 at 10:38 AM
To: Robert Metzger 
Cc: Peter Westermann , "user@flink.apache.org" 

Subject: Re: Feature request: Removing state from operators

Hi Peter
 Can applyToAllKeys[1] in KeyedStateBackend help you here? but currently, 
this is not exposed to users now.

[1] 
https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65

Best,
Congxian


Robert Metzger mailto:rmetz...@apache.org>> 于2020年10月27日周二 
下午5:51写道:
Hi Peter,

I'm adding two committers to this thread who can help answering your question.

On Mon, Oct 26, 2020 at 3:22 PM Peter Westermann 
mailto:no.westerm...@genesys.com>> wrote:
We use the feature for removing stateful operators via the 
allowNonRestoredState relatively often and it works great. However, there 
doesn’t seem to be anything like that for removing state from an existing 
operator (that we want to keep).
Say my operator defines a MapState and a ValueState. Later on, the ValueState 
becomes obsolete. In this case, we can remove the actual data for each key by 
clearing it out but the state itself is still referenced in savepoints even if 
it’s not referenced in code anymore – that e.g. means one cannot remove any 
class that was previously used in state.
Would it be possible to add support for completely removing state from an 
operator if it’s no longer referenced in code and allowNonRestoredState is set? 
(Or to add an explicit “drop this state option” in KeyedStateStore and 
OperatorStateStore?)

Thanks,

Peter



Table Print SQL Connector

2020-10-29 Thread Ruben Laguna
How can I use the Table [Print SQL connector][1]? I tried the
following (batch mode)  but it does not give any output:


EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);

Table transactions =
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("account_id", DataTypes.BIGINT()),
DataTypes.FIELD("amount", DataTypes.BIGINT()),
DataTypes.FIELD("transaction_time",
DataTypes.TIMESTAMP(3))),
Row.of(1, 188, DATE_TIME.plusMinutes(12)),
Row.of(2, 374, DATE_TIME.plusMinutes(47)),
Row.of(3, 112, DATE_TIME.plusMinutes(36)),
Row.of(4, 478, DATE_TIME.plusMinutes(3)),
Row.of(5, 208, DATE_TIME.plusMinutes(8)),
Row.of(1, 379, DATE_TIME.plusMinutes(53)),
Row.of(2, 351, DATE_TIME.plusMinutes(32)),
Row.of(3, 320, DATE_TIME.plusMinutes(31)),
Row.of(4, 259, DATE_TIME.plusMinutes(19)),
Row.of(5, 273, DATE_TIME.plusMinutes(42)));
tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");

transactions.executeInsert("print_table");


I can "materialize" the result manually and print them out with :

for (Row row : materialize(transactions.execute())) {
System.out.println(row);
}

private static List materialize(TableResult results) {
try (CloseableIterator resultIterator = results.collect()) {
return StreamSupport

.stream(Spliterators.spliteratorUnknownSize(resultIterator,
Spliterator.ORDERED), false)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to materialize results", e);
}
}


But I would like to know why I can't just use the Print sink.

I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
don't thinks it's that.

Does anybody know of any working example involving the print connector?



[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
-- 
/Rubén


Re: Table Print SQL Connector

2020-10-29 Thread Dawid Wysakowicz
You should be able to use the "print" sink. Remember though that the
"print" sink prints into the stdout/stderr of TaskManagers, not the
Client, where you submit the query. This is different from the
TableResult, which collects results in the client. BTW, for printing you
can use TableResult#print, which will nicely format your results.

Best,

Dawid

On 29/10/2020 16:13, Ruben Laguna wrote:
> How can I use the Table [Print SQL connector][1]? I tried the
> following (batch mode)  but it does not give any output:
>
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>
> Table transactions =
> tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("account_id", DataTypes.BIGINT()),
> DataTypes.FIELD("amount", DataTypes.BIGINT()),
> DataTypes.FIELD("transaction_time",
> DataTypes.TIMESTAMP(3))),
> Row.of(1, 188, DATE_TIME.plusMinutes(12)),
> Row.of(2, 374, DATE_TIME.plusMinutes(47)),
> Row.of(3, 112, DATE_TIME.plusMinutes(36)),
> Row.of(4, 478, DATE_TIME.plusMinutes(3)),
> Row.of(5, 208, DATE_TIME.plusMinutes(8)),
> Row.of(1, 379, DATE_TIME.plusMinutes(53)),
> Row.of(2, 351, DATE_TIME.plusMinutes(32)),
> Row.of(3, 320, DATE_TIME.plusMinutes(31)),
> Row.of(4, 259, DATE_TIME.plusMinutes(19)),
> Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>
> transactions.executeInsert("print_table");
>
>
> I can "materialize" the result manually and print them out with :
>
> for (Row row : materialize(transactions.execute())) {
> System.out.println(row);
> }
>
> private static List materialize(TableResult results) {
> try (CloseableIterator resultIterator = results.collect()) {
> return StreamSupport
>
> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> Spliterator.ORDERED), false)
> .collect(Collectors.toList());
> } catch (Exception e) {
> throw new RuntimeException("Failed to materialize results", e);
> }
> }
>
>
> But I would like to know why I can't just use the Print sink.
>
> I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
> don't thinks it's that.
>
> Does anybody know of any working example involving the print connector?
>
>
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html



signature.asc
Description: OpenPGP digital signature


Re: Table Print SQL Connector

2020-10-29 Thread Ruben Laguna
Hi,

Using `mytable.execute().print()` is exactly what I wanted, thanks.

But I'm still curious. I'm just running this locally, in a junit test
case (not using a flink
cluster) just like in [flink-playgrounds SpendReportTest][1] so in
this scenario where does the task manager (if there is taskmanager)
 output go?

I just added src/test/resources/log4j.properties with

# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{-MM-dd HH:mm:ss}
%-5p %c{1}:%L - %m%n

and still I don't see anything from the print sink, and I even run it
with the debugger and I can see that although
PrintSink#getSinkRuntimeProvider is called , the
RowDataPrintFunction#invoke is never called.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.heap.size required for
local execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.off-heap.size required
for local execution is not set, setting it to the maximal possible
value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.managed.size required for
local execution is not set, setting it to its default value 128 mb.
2020-10-29 18:18:32 INFO  MiniCluster:253 - Starting Flink Mini Cluster
2020-10-29 18:18:32 INFO  MiniCluster:262 - Starting Metrics Registry
2020-10-29 18:18:32 INFO  MetricRegistryImpl:122 - No metrics reporter
configured, no metrics will be exposed/reported.
2020-10-29 18:18:32 INFO  MiniCluster:266 - Starting RPC Service(s)
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink-metrics
2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .
2020-10-29 18:18:32 INFO  MiniCluster:432 - Starting high-availability services
2020-10-29 18:18:32 INFO  BlobServer:143 - Created BLOB server storage
directory 
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd0gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
2020-10-29 18:18:32 INFO  BlobServer:207 - Started BLOB server at
0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
2020-10-29 18:18:32 INFO  PermanentBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd0gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
2020-10-29 18:18:32 INFO  TransientBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd0gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
2020-10-29 18:18:32 INFO  MiniCluster:519 - Starting 1 TaskManger(s)
2020-10-29 18:18:32 INFO  TaskManagerRunner:412 - Starting TaskManager
with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
2020-10-29 18:18:32 INFO  TaskManagerServices:411 - Temporary file
directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd0gp/T': total
233 GB, usable 25 GB (10.73% usable)
2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd0gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
for spill files.
2020-10-29 18:18:32 INFO  FileChannelMan

Re: ValidationException using DataTypeHint in Scalar Function

2020-10-29 Thread Steve Whelan
For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
about to describe is our implementation on v1.9, which worked. I am trying
to achieve the same functionality on v1.11.

I have a DataStream whose type is an avro generated POJO, which contains a
field *UrlParameters* that is of type *Map*. I register
this stream as a view so I can perform SQL queries on it. One of the
queries contains the UDF I have previously posted. It appears that in the
conversion to a view, the type of *UrlParameters* is being converted
into *RAW('java.util.Map',
?)*.


*Code on v1.9*

DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO
tableEnvironment.registerDataStream("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


*The produced type of my deserializer is:*

@Override
public TypeInformation getProducedType() {
// Ping.class is an avro generated POJO
return TypeInformation.of(Ping.class);
}

*Scalar UDF MAP_VALUE:*

public static String eval(final Map map, final String key) {
return map.get(key);
}


I an using a UDF to access fields in the *UrlParameters* map because if I
try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
I get the below exception. This stackoverflow[1] had suggested the UDF as a
work around.

Caused by: org.apache.flink.table.api.TableException: Type is not
supported: ANY
at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)


This above implementation worked successfully on v1.9. We use a stream
source instead of a table source b/c we do other non-SQL type things with
the stream.


*Code on v1.11*

The following is the implementation on v1.11 which does not work. I was
using the Old Planner on v1.9 but have switched to the Blink Planner on
v1.11, in case that has any relevance here.


DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO object
tableEnvironment.createTemporaryView("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


The UDF referenced above produced the below error. So I assumed adding
DataTypeHints was the way to solve it but I was unable to get that to work.
That is what prompted the initial email to the ML.

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(map => MAP, key => STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 50 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type MAP expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 51 more


I can try creating a concrete reproducible example if this explanation
isn't enough though its quite a bit with the avro POJO and custom
deserializer.


Thanks,

Steve


[1]
https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types

>


Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Alexander Bagerman
Thanks, Yun. Makes sense. How would you reference a jar file from inside of
another jar for such invocation?
In my case I would have an interactive application - spring boot web app -
where the job would be configured and
StreamExecutionEnvironment.execute(jobName)
is called.
Spring app is a runnable fat jar with my "flink" jar packaged along with
other jars. How would I specify location to the jar so that
StreamExecutionEnvironment
can find it?
Thanks,
Alex

On Wed, Oct 28, 2020 at 11:03 PM Yun Gao  wrote:

> Hi Alexander,
>
>  From my side I still think it should be reasonable to have a jar that
> contains the code that are running in the clients and also shipped to the
> cluster. Then this jar could also be included in the shipping jar list.
>
>  For the second issue, similarly I think you may first build the project
> to get the jar containing the code, then fill the path of the generated jar
> in to test the submitting.
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Alexander Bagerman 
> *Send Date:*Thu Oct 29 11:38:45 2020
> *Recipients:*Yun Gao 
> *CC:*Flink ML 
> *Subject:*Re: How to deploy dynamically generated flink jobs?
>
>> I did try it but this option seems to be for a third party jar. In my
>> case I would need to specify/ship a jar that contains the code where job is
>> being constracted. I'm not clear of
>> 1. how to point to the containg jar
>> 2. how to test such a submission from my project running in Eclipse
>> Alex
>>
>> On Wed, Oct 28, 2020 at 8:21 PM Yun Gao  wrote:
>>
>>> Hi Alexander,
>>>
>>> The signature of the createRemoteEnvironment is
>>>
>>> public static StreamExecutionEnvironment createRemoteEnvironment(
>>>   String host, int port, String... jarFiles);
>>>
>>> Which could also ship the jars to execute to remote cluster. Could you have 
>>> a try to also pass the jar files to the remote environment ?
>>>
>>>
>>> Best,
>>>
>>>  Yun
>>>
>>> --
>>> Sender:Alexander Bagerman
>>> Date:2020/10/29 10:43:16
>>> Recipient:
>>> Theme:How to deploy dynamically generated flink jobs?
>>>
>>>
>>>
>>> Hi,
>>>
>>> I am trying to build a functionality to dynamically configure a flink
>>> job (Java) in my code based on some additional metadata and submit it to a
>>> flink running in a session cluster.
>>>
>>> Flink version is 1.11.2
>>>
>>> The problem I have is how to provide a packed job to the cluster. When I
>>> am trying the following code
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);... 
>>> configuring job workflow here...env.execute(jobName);
>>>
>>> I am getting ClassNotFoundException stating that code for my mapping
>>> functions did not make it to the cluster. Which makes sense.
>>>
>>> What would be the right way to deploy dynamically configured flink jobs
>>> which are not packaged as a jar file but rather generated ad-hoc?
>>>
>>> Thanks
>>>
>>>


Native kubernetes setup failed to start job

2020-10-29 Thread Chen Liangde
I created a flink cluster in kubernetes following this guide:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html

The job manager was running. When a job was submitted to the job manager,
it spawned a task manager pod, but the task manager failed to connect to
the job manager. And in the job manager web ui I can't find the task
manager.

This error is
suspicious: 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 352518404 - discarded

2020-10-29 13:22:51,069 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Connecting to ResourceManager
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().
2020-10-29 13:22:51,176 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
java.io.IOException: Connection reset by peer
2020-10-29 13:22:51,176 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 352518404 - discarded
2020-10-29 13:22:51,180 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]]
Caused by: [The remote system explicitly disassociated (reason
unknown).]
2020-10-29 13:22:51,183 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Could not resolve ResourceManager address
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.
2020-10-29 13:23:01,203 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
java.io.IOException: Connection reset by peer


Re: how to enable metrics in Flink 1.11

2020-10-29 Thread Diwakar Jha
Hello Everyone,

I'm able to get my Flink UI up and running (it was related to the session
manager plugin on my local laptop) but I'm not seeing any
taskmanager/jobmanager logs in my Flink application. I have attached some
yarn application logs while it's running but am not able to figure out how
to stop and get more logs. Could someone please help me figure this out?
I'm running Flink 1.11 on the EMR 6.1 cluster.

On Tue, Oct 27, 2020 at 1:06 PM Diwakar Jha  wrote:

> Hi Robert,
> Could please correct me. I'm not able to stop the app. Also, i
> stopped flink job already.
>
> sh-4.2$ yarn app -stop application_1603649952937_0002
> 2020-10-27 20:04:25,543 INFO client.RMProxy: Connecting to ResourceManager
> at ip-10-0-55-50.ec2.internal/10.0.55.50:8032
> 2020-10-27 20:04:25,717 INFO client.AHSProxy: Connecting to Application
> History server at ip-10-0-55-50.ec2.internal/10.0.55.50:10200
> Exception in thread "main" java.lang.IllegalArgumentException: App admin
> client class name not specified for type Apache Flink
> at
> org.apache.hadoop.yarn.client.api.AppAdminClient.createAppAdminClient(AppAdminClient.java:76)
> at
> org.apache.hadoop.yarn.client.cli.ApplicationCLI.run(ApplicationCLI.java:597)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
> at
> org.apache.hadoop.yarn.client.cli.ApplicationCLI.main(ApplicationCLI.java:126)
> sh-4.2$
>
> On Tue, Oct 27, 2020 at 9:34 AM Robert Metzger 
> wrote:
>
>> Hi,
>> are you intentionally not posting this response to the mailing list?
>>
>> As you can see from the yarn logs, log aggregation only works for
>> finished applications ("End of LogType:prelaunch.out.This log file belongs
>> to a running container (container_1603649952937_0002_01_02) and so may
>> not be complete.")
>>
>> Please stop the app, then provide the logs.
>>
>>
>> On Tue, Oct 27, 2020 at 5:11 PM Diwakar Jha 
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Yes, i'm using Flink on EMR using YARN. Please find attached the yarn
>>> logs -applicationId. I also attached haddop-yarn-nodemanager logs.
>>> Also, I followed this link below which has the same problem :
>>> http://mail-archives.apache.org/mod_mbox/flink-user/202009.mbox/%3CCAGDv3o5WyJTrXs9Pg+Vy-b+LwgEE26iN54iqE0=f5t+m8vw...@mail.gmail.com%3E
>>>
>>> https://www.talkend.net/post/75078.html
>>> Based on this I changed the log4j.properties.
>>> Let me know what you think. Please also let me know if you need some
>>> specific logs.  Appreciate your help.
>>>
>>> Best,
>>> Diwakar
>>>
>>> On Tue, Oct 27, 2020 at 12:26 AM Robert Metzger 
>>> wrote:
>>>
 Hey Diwakar,

 how are you deploying Flink on EMR? Are you using YARN?
 If so, you could also use log aggregation to see all the logs at once
 (from both JobManager and TaskManagers). (yarn logs -applicationId
 )

 Could you post (or upload somewhere) all logs you have of one run? It
 is much easier for us to debug something if we have the full logs (the logs
 show for example the classpath that you are using, we would see how you are
 deploying Flink, etc.)

 From the information available, my guess is that you have modified your
 deployment in some way (use of a custom logging version, custom deployment
 method, version mixup with jars from both Flink 1.8 and 1.11, ...).

 Best,
 Robert


 On Tue, Oct 27, 2020 at 12:41 AM Diwakar Jha 
 wrote:

> This is what I see on the WebUI.
>
> 23:19:24.263 [flink-akka.actor.default-dispatcher-1865] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
> - Failed to transfer file from TaskExecutor
> container_1603649952937_0002_01_04.
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file LOG does not exist on the
> TaskExecutor. at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(
> TaskExecutor.java:1742 )
> ~[flink-dist_2.12-1.11.0.jar:1.11.0] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run
> (
> CompletableFuture.java:1604 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (
> ThreadPoolExecutor.java:624 )
> ~[?:1.8.0_252] at java.lang.Thread.run (
> Thread.java:748 ) ~[?:1.8.0_252] Caused by:
> org.apache.flink.util.FlinkException: The file LOG does not exist on the
> TaskExecutor. ... 5 more 

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori,

I'm not sure about where the problem comes from. There are several things
that might worse a try.
- Further increasing the `jvm-overhead`. Your `ps` result suggests that
the Flink process uses 120+GB, while `process.size` is configured 112GB. So
I think 2GB `jvm-overhead` might not be enough. I would suggest to tune
`managed.fraction` back to 0.4 and increase `jvm-overhead` to around 12GB.
This should give you roughly the same `process.size` as before, while
leaving more unmanaged native memory space.
- During the 7-10 job running days, are there any failovers/restarts? If
yes, you might want to look into this comment [1] in FLINK-18712.
- If neither of the above actions helps, we might need to leverage tools
(e.g., JVM NMT [2]) to track the native memory usages and see where exactly
the leak comes from.

Thank you~

Xintong Song


[1]
https://issues.apache.org/jira/browse/FLINK-18712?focusedCommentId=17189138&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17189138

[2]
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html

On Thu, Oct 29, 2020 at 7:51 PM Ori Popowski  wrote:

>
> Hi Xintong,
>
> Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or
> 1.11.0.
>
> About the overhead - turns out I already configured
> taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
> Should I increase it further?
>
> state.backend.rocksdb.memory.managed is already not explicitly configured.
>
> Is there anything else I can do?
>
>
>
> On Thu, Oct 29, 2020 at 1:24 PM Xintong Song 
> wrote:
>
>> Hi Ori,
>>
>> RocksDB also uses managed memory. If the memory overuse indeed comes from
>> RocksDB, then increasing managed memory fraction will not help. RocksDB
>> will try to use as many memory as the configured managed memory size.
>> Therefore increasing managed memory fraction also makes RocksDB try to use
>> more memory. That is why I suggested increasing `jvm-overhead` instead.
>>
>> Please also make sure the configuration option
>> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
>> or configured to `true`.
>>
>> In addition, I noticed that you are using Flink 1.10.0. You might want to
>> upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:
>>
>>> Hi,
>>>
>>> PID 20331 is indeed the Flink process, specifically the TaskManager
>>> process.
>>>
>>> - Workload is a streaming workload reading from Kafka and writing to S3
>>> using a custom Sink
>>> - RockDB state backend is used with default settings
>>> - My external dependencies are:
>>> -- logback
>>> -- jackson
>>> -- flatbuffers
>>> -- jaxb-api
>>> -- scala-java8-compat
>>> -- apache commons-io
>>> -- apache commons-compress
>>> -- software.amazon.awssdk s3
>>> - What do you mean by UDFs? I've implemented several operators like
>>> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>>>
>>> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
>>> minutes delay.
>>>
>>> We did confirm we have some keys in our job which keep receiving records
>>> indefinitely, but I'm not sure why it would cause a managed memory leak,
>>> since this should be flushed to RocksDB and free the memory used. We have a
>>> guard against this, where we keep the overall size of all the records for
>>> each key, and when it reaches 300mb, we don't move the records downstream,
>>> which causes them to create a session and go through the sink.
>>>
>>> About what you suggested - I kind of did this by increasing the managed
>>> memory fraction to 0.5. And it did postpone the occurrence of the problem
>>> (meaning, the TMs started crashing after 10 days instead of 7 days). It
>>> looks like anything I'll do on that front will only postpone the problem
>>> but not solve it.
>>>
>>> I am attaching the full job configuration.
>>>
>>>
>>>
>>> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song 
>>> wrote:
>>>
 Hi Ori,

 It looks like Flink indeed uses more memory than expected. I assume the
 first item with PID 20331 is the flink process, right?

 It would be helpful if you can briefly introduce your workload.
 - What kind of workload are you running? Streaming or batch?
 - Do you use RocksDB state backend?
 - Any UDFs or 3rd party dependencies that might allocate significant
 native memory?

 Moreover, if the metrics shows only 20% heap usages, I would suggest
 configuring less `task.heap.size`, leaving more memory to off-heap. The
 reduced heap size does not necessarily all go to the managed memory. You
 can also try increasing the `jvm-overhead`, simply to leave more native
 memory in the container in case there are other other significant native
 memory usages.

 Thank you~

 Xintong Song



 On Wed, Oct 28, 2020 at 5:53 PM Ori P

Re: Native kubernetes setup failed to start job

2020-10-29 Thread Yun Gao
Hi Liangde,

   I pull in Yang Wang who is the expert for Flink on K8s.  

Best,
 Yun

 --Original Mail --
Sender:Chen Liangde 
Send Date:Fri Oct 30 05:30:40 2020
Recipients:Flink ML 
Subject:Native kubernetes setup failed to start job

I created a flink cluster in kubernetes following this guide: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
The job manager was running. When a job was submitted to the job manager, it 
spawned a task manager pod, but the task manager failed to connect to the job 
manager. And in the job manager web ui I can't find the task manager.
This error is suspicious: 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 352518404 - discarded
2020-10-29 13:22:51,069 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Connecting to 
ResourceManager 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().2020-10-29
 13:22:51,176 WARN  akka.remote.transport.netty.NettyTransport  
 [] - Remote connection to 
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
java.io.IOException: Connection reset by peer2020-10-29 13:22:51,176 WARN  
akka.remote.transport.netty.NettyTransport   [] - Remote 
connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed 
with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 352518404 - discarded2020-10-29 
13:22:51,180 WARN  akka.remote.ReliableDeliverySupervisor   
[] - Association with remote system 
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has failed, 
address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]] Caused by: [The 
remote system explicitly disassociated (reason unknown).]2020-10-29 
13:22:51,183 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
[] - Could not resolve ResourceManager address 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*,
 retrying in 1 ms: Could not connect to rpc endpoint under address 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.2020-10-29
 13:23:01,203 WARN  akka.remote.transport.netty.NettyTransport  
 [] - Remote connection to 
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
java.io.IOException: Connection reset by peer

Re: Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Yun Gao
Hi Alexander, 

  Sorry I might not fully understand the issue, do you means the "flink" 
jar is the same jar with the spring app fat jar, or they are not the same jar? 
As a whole, I think the parameter value we need for jarFiles is the absolute 
path of the jar file. We might need some logic to decide the path to the jar 
files. For example, if the "flink" jar containing the UDF is the same to the 
spring app fat jar containing the execute call, we might use method like [1] to 
find the containing jar, otherwise we might need some mappings from the job 
name to its flink jar. 

Best,
 Yun

[1] 
https://github.com/apache/hadoop/blob/8ee6bc2518bfdf7ad257cc1cf3c73f4208c49fc0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ClassUtil.java#L38

 --Original Mail --
Sender:Alexander Bagerman 
Send Date:Fri Oct 30 04:49:59 2020
Recipients:Yun Gao 
CC:Flink ML 
Subject:Re: Re: How to deploy dynamically generated flink jobs?

Thanks, Yun. Makes sense. How would you reference a jar file from inside of 
another jar for such invocation? 
In my case I would have an interactive application - spring boot web app - 
where the job would be configured and 
StreamExecutionEnvironment.execute(jobName) is called. 
Spring app is a runnable fat jar with my "flink" jar packaged along with other 
jars. How would I specify location to the jar so that 
StreamExecutionEnvironment can find it?
Thanks,
Alex

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Congxian Qiu
Hi Partha
 The exception here said that there is some operator in the
checkpoint/savepoint, but not in the new program.
 As you said that, both times use the same user program binary,  this
seems a little strange to me. did you ever set the uid for all the
operators?

Best,
Congxian


Partha Mishra  于2020年10月23日周五 下午3:02写道:

> Hi,
>
>
>
> None of the operator is renamed or removed. Testing is carried out with
> exactly same binary used with 1.9 and 1.11.2. Checkpoint saved in 1.9 is
> not being able to retrieve in 1.11.2
>
>
>
>
>
> *From:* Sivaprasanna 
> *Sent:* Friday, October 23, 2020 10:57 AM
> *To:* Partha Mishra 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Resuming Savepoint issue with upgraded Flink version 1.11.2
>
>
>
> Hi,
>
>
>
> Have you dropped or renamed any operator from the original job? If yes,
> and you are okay with discarding the state of that operator, you can submit
> the job with --allowNonRestoredState or -n.
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>
>
>
> -
>
> Sivaprasanna
>
>
>
> On Fri, Oct 23, 2020 at 10:48 AM Partha Mishra 
> wrote:
>
> Hi,
>
>
>
> We are trying to save checkpoints for one of the flink job running in
> Flink version 1.9 and tried to resume the same flink job in Flink version
> 1.11.2. We are getting the below error when trying to restore the saved
> checkpoint in the newer flink version. Can
>
>
>
> Cannot map checkpoint/savepoint state for operator
> fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator
> is not available in the new program.
>
>
>
>
>
> *Complete Stack Trace :*
>
> {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException:
> Could not execute application.\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not execute
> application.\n\tat
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
> 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not
> execute application.\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
> 7 more\nCaused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute job
> 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
> 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute
> job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
> org.apache.flink.streaming.api.environment.StreamExec

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
- I will increase the jvm-overhead
- I don't have any failovers or restarts until it starts happening
- If it happens again even with the changes, I'll post the NMT output

On Fri, Oct 30, 2020 at 3:54 AM Xintong Song  wrote:

> Hi Ori,
>
> I'm not sure about where the problem comes from. There are several things
> that might worse a try.
> - Further increasing the `jvm-overhead`. Your `ps` result suggests that
> the Flink process uses 120+GB, while `process.size` is configured 112GB. So
> I think 2GB `jvm-overhead` might not be enough. I would suggest to tune
> `managed.fraction` back to 0.4 and increase `jvm-overhead` to around 12GB.
> This should give you roughly the same `process.size` as before, while
> leaving more unmanaged native memory space.
> - During the 7-10 job running days, are there any failovers/restarts? If
> yes, you might want to look into this comment [1] in FLINK-18712.
> - If neither of the above actions helps, we might need to leverage tools
> (e.g., JVM NMT [2]) to track the native memory usages and see where exactly
> the leak comes from.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-18712?focusedCommentId=17189138&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17189138
>
> [2]
> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html
>
> On Thu, Oct 29, 2020 at 7:51 PM Ori Popowski  wrote:
>
>>
>> Hi Xintong,
>>
>> Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0
>> or 1.11.0.
>>
>> About the overhead - turns out I already configured
>> taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
>> Should I increase it further?
>>
>> state.backend.rocksdb.memory.managed is already not explicitly
>> configured.
>>
>> Is there anything else I can do?
>>
>>
>>
>> On Thu, Oct 29, 2020 at 1:24 PM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> RocksDB also uses managed memory. If the memory overuse indeed comes
>>> from RocksDB, then increasing managed memory fraction will not help.
>>> RocksDB will try to use as many memory as the configured managed memory
>>> size. Therefore increasing managed memory fraction also makes RocksDB try
>>> to use more memory. That is why I suggested increasing `jvm-overhead`
>>> instead.
>>>
>>> Please also make sure the configuration option
>>> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
>>> or configured to `true`.
>>>
>>> In addition, I noticed that you are using Flink 1.10.0. You might want
>>> to upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:
>>>
 Hi,

 PID 20331 is indeed the Flink process, specifically the TaskManager
 process.

 - Workload is a streaming workload reading from Kafka and writing to S3
 using a custom Sink
 - RockDB state backend is used with default settings
 - My external dependencies are:
 -- logback
 -- jackson
 -- flatbuffers
 -- jaxb-api
 -- scala-java8-compat
 -- apache commons-io
 -- apache commons-compress
 -- software.amazon.awssdk s3
 - What do you mean by UDFs? I've implemented several operators like
 KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.

 We use a SessionWindow with 30 minutes of gap, and a watermark with 10
 minutes delay.

 We did confirm we have some keys in our job which keep receiving
 records indefinitely, but I'm not sure why it would cause a managed memory
 leak, since this should be flushed to RocksDB and free the memory used. We
 have a guard against this, where we keep the overall size of all the
 records for each key, and when it reaches 300mb, we don't move the records
 downstream, which causes them to create a session and go through the sink.

 About what you suggested - I kind of did this by increasing the managed
 memory fraction to 0.5. And it did postpone the occurrence of the problem
 (meaning, the TMs started crashing after 10 days instead of 7 days). It
 looks like anything I'll do on that front will only postpone the problem
 but not solve it.

 I am attaching the full job configuration.



 On Thu, Oct 29, 2020 at 10:09 AM Xintong Song 
 wrote:

> Hi Ori,
>
> It looks like Flink indeed uses more memory than expected. I assume
> the first item with PID 20331 is the flink process, right?
>
> It would be helpful if you can briefly introduce your workload.
> - What kind of workload are you running? Streaming or batch?
> - Do you use RocksDB state backend?
> - Any UDFs or 3rd party dependencies that might allocate significant
> native memory?
>
> Moreover, if the metrics shows only 20% heap usages, I would suggest
> configuring less `task.heap.size`, leaving m

RE: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Partha Mishra
Hi Cong,

No never set the uid for the operators.

Regards,
Partha Mishra

From: Congxian Qiu 
Sent: Friday, October 30, 2020 10:51 AM
To: Partha Mishra 
Cc: Sivaprasanna ; user@flink.apache.org
Subject: Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

Hi Partha
 The exception here said that there is some operator in the 
checkpoint/savepoint, but not in the new program.
 As you said that, both times use the same user program binary,  this seems 
a little strange to me. did you ever set the uid for all the operators?

Best,
Congxian


Partha Mishra mailto:partha.mis...@man-es.com>> 
于2020年10月23日周五 下午3:02写道:
Hi,

None of the operator is renamed or removed. Testing is carried out with exactly 
same binary used with 1.9 and 1.11.2. Checkpoint saved in 1.9 is not being able 
to retrieve in 1.11.2


From: Sivaprasanna mailto:sivaprasanna...@gmail.com>>
Sent: Friday, October 23, 2020 10:57 AM
To: Partha Mishra mailto:partha.mis...@man-es.com>>
Cc: user@flink.apache.org
Subject: Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

Hi,

Have you dropped or renamed any operator from the original job? If yes, and you 
are okay with discarding the state of that operator, you can submit the job 
with --allowNonRestoredState or -n. 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

-
Sivaprasanna

On Fri, Oct 23, 2020 at 10:48 AM Partha Mishra 
mailto:partha.mis...@man-es.com>> wrote:
Hi,

We are trying to save checkpoints for one of the flink job running in Flink 
version 1.9 and tried to resume the same flink job in Flink version 1.11.2. We 
are getting the below error when trying to restore the saved checkpoint in the 
newer flink version. Can

Cannot map checkpoint/savepoint state for operator 
fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
not available in the new program.


Complete Stack Trace :
{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
not execute application.\n\tat 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.\n\tat 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
execute application.\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error: Failed to execute job 
'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironm

Insufficient number of network buffers for simple last_value aggregate

2020-10-29 Thread Schneider, Thilo
Dear list,

when trying to compute a simple last_value aggregate, flink fails with an 
IOException. The query is defined as follows:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)


t_env.execute_sql("""
CREATE TABLE key_change_test (
  id INT,  val1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
""")

tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS 
val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH 
('connector' = 'print')")
tt.execute_insert("debug")


Looking at the logs I get the following error message:
[…]
2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, key_change_test]], fields=[id, val1, val2]) (21/88) 
(02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
java.io.IOException: Insufficient number of network buffers: required 89, but 
only 67 available. The total number of network buffers is currently set to 2048 
of 32768 bytes each. You can increase this number by setting the configuration 
keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', 
and 'taskmanager.memory.network.max'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
[…]

What is happening there? For me it seems that flink is requesting an awful lot 
of resources for a simple query (the kafka topic has only one partition and is 
used for manual injection only, so no big traffic there).
Can you help me with any way around that problem?

Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang