[ 
https://issues.apache.org/jira/browse/FLINK-25670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484608#comment-17484608
 ] 

Igal Shilman edited comment on FLINK-25670 at 1/31/22, 10:52 AM:
-----------------------------------------------------------------

[~JSilent], looking at the last exception you have posted:
{code:java}
 2022-01-23 03:44:55,441 WARN  
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception 
caught while trying to deliver a message: (attempt 
#1)ToFunctionRequestSummary(address=Address(example, hello, bbbb), batchSize=1, 
totalSizeInBytes=182611266, numberOfStates=2)
org.apache.flink.statefun.flink.core.nettyclient.exceptions.WrongHttpResponse: 
Unexpected response code 413 (Request Entity Too Large){code}
The exception logged says that StateFun tried to send a request of 182611266 
bytes to the Python worker but the Python worker has responded with 413 - 
Request too large HTTP code.

You need to make sure that your python process along with the HTTP framework 
used (aiohttp in that case) is able to accept requests of that size.

Checkout page #123 from the following user manual 
([https://docs.aiohttp.org/_/downloads/en/v3.7.2/pdf/|https://docs.aiohttp.org/_/downloads/en/v3.7.2/pdf/)]
 and also [https://docs.aiohttp.org/en/stable/web_reference.html]) of how to 
allow aiohttp to accept larger request sizes.

It seems like there is a parameter called  "client_max_size" which seems to be 
set by default to 1MB, you may want to set it to a higher value

 As a side note, I believe that you can benefit from compression, or efficient 
encoding. you to compress your arrays before storing them in state/sending them 
as messages.
{code:java}
numpy.savez_compressed {code}
 

I hope that this resolve your issue, please let us know.

 

 

 

 

 

 

 


was (Author: igal):
[~JSilent], looking at the last exception you have posted:
{code:java}
 2022-01-23 03:44:55,441 WARN  
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception 
caught while trying to deliver a message: (attempt 
#1)ToFunctionRequestSummary(address=Address(example, hello, bbbb), batchSize=1, 
totalSizeInBytes=182611266, numberOfStates=2)
org.apache.flink.statefun.flink.core.nettyclient.exceptions.WrongHttpResponse: 
Unexpected response code 413 (Request Entity Too Large){code}
The exception logged says that StateFun tried to send a request of 182611266 
bytes to the Python worker but the Python worker has responded with 413 - 
Request too large HTTP code.

You need to make sure that you python process along with the HTTP framework 
used (aiohttp in that case) is able to accept requests of that size.

Checkout page #123 from the following user manual 
([https://docs.aiohttp.org/_/downloads/en/v3.7.2/pdf/|https://docs.aiohttp.org/_/downloads/en/v3.7.2/pdf/)]
 and also [https://docs.aiohttp.org/en/stable/web_reference.html]) of how to 
allow aiohttp to accept larger request sizes.

It seems like there is a parameter called  "client_max_size" which seems to be 
set by default to 1MB, you may want to set it to a higher value

 As a side note, I believe that you can benefit from compression, or efficient 
encoding. you to compress your arrays before storing them in state/sending them 
as messages.
{code:java}
numpy.savez_compressed {code}
 

I hope that this resolve your issue, please let us know.

 

 

 

 

 

 

 

> StateFun: Unable to handle oversize HTTP message if state size is large
> -----------------------------------------------------------------------
>
>                 Key: FLINK-25670
>                 URL: https://issues.apache.org/jira/browse/FLINK-25670
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.1.1
>            Reporter: Kyle
>            Priority: Major
>         Attachments: 00-module.yaml, functions.py
>
>
> Per requirement we need to handle state which is about 500MB large (72MB 
> state allocated in commented code as attached). However the HTTP message 
> limit disallows us to send back large state to StateFun cluster after saving 
> state in Stateful Function.
> Another question is whether large data is allowed to send to Stateful 
> Function from ingress.
>  
> 2022-01-17 07:57:18,416 WARN  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception 
> caught while trying to deliver a message: (attempt 
> #10)ToFunctionRequestSummary(address=Address(example, hello, 5555), 
> batchSize=1, totalSizeInBytes=80, numberOfStates=2)
> org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: 
> Response entity too large: DefaultHttpResponse(decodeResult: success, 
> version: HTTP/1.1)
> HTTP/1.1 200 OK
> Content-Type: application/octet-stream
> Content-Length: 40579630
> Date: Mon, 17 Jan 2022 07:57:18 GMT
> Server: Python/3.9 aiohttp/3.8.1
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:276)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:87)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.invokeHandleOversizedMessage(MessageAggregator.java:404)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:254)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:425)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [statefun-flink-distribution.jar:3.1.1]
>         at java.lang.Thread.run(Unknown Source) [?:?]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to