Re: Is Idle state retention time in SQL client possible?

2019-09-17 Thread Fabian Hueske
Hi,

This can be set via the environment file.
Please have a look at the documentation [1] (see "execution:
min-idle-state-retention: " and "execution: max-idle-retention: " keys).

Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html#environment-files

Am Di., 17. Sept. 2019 um 11:01 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> Hi there,
>
> I've come across this link
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html
> >
> for Idle state retention time. Would like to know if I could configure this
> for SQL client and how?
>
> Thanks
> Srikanth
>


Re: Problem starting taskexecutor daemons in 3 node cluster

2019-09-17 Thread Till Rohrmann
SSH access to the nodes and nodes being able to talk to each other are
separate issues. The former is only used for starting the Flink cluster.
Once the cluster is started, Flink only requires that nodes can talk to
each other (independent of SSH).

Cheers,
Till

On Tue, Sep 17, 2019 at 7:39 AM Komal Mariam  wrote:

> Hi Till,
>
> Thank you for the reply. I tried to ssh each of the nodes individually
> with each other and they all can connect to each other.  Its just that all
> the other worker nodes cannot for some reason. connect to the job manager
> on 150.82.218.218:6123. (Node 1)
>
> I got around the problem by setting the master node (JobManager) on Node 2
> and making 150.82.218.218 as a slave (TaskManager).
>
> now, all nodes including 150.82.218.218 are showing up in the new
> jobmanager's UI and I can see my jobs getting distributed between them too.
>
> For now, all my nodes have password enabled SSH. Do you think this issue
> could be because I have not set passwordless SSH? If the start-cluster.yaml
> can instantiate the nodes with password ssh why is it important to set
> passwordless SSH (aside from convenience)?
>
> Best Regards,
> Komal
>
> On Fri, 13 Sep 2019 at 18:31, Till Rohrmann  wrote:
>
>> Hi Komal,
>>
>> could you check that every node can reach the other nodes? It looks a
>> little bit as if the TaskManager cannot talk to the JobManager running on
>> 150.82.218.218:6123.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 12, 2019 at 9:30 AM Komal Mariam 
>> wrote:
>>
>>> I managed to fix it however ran into another problem that I could
>>> appreciate help in resolving.
>>>
>>> it turns out that the username for all three nodes was different. having
>>> the same username for them fixed the issue. i.e
>>> same_username@slave-node2-hostname
>>> same_username@slave-node3-hostname
>>> same_username@master-node1-hostname
>>>
>>> Infact, because the usernames are the same, I can just save them in the
>>> conf files as:
>>> slave-node2-hostname
>>> slave-node3-hostname
>>> master-node1-hostname
>>>
>>> However, for some reason my worker nodes dont show up in the available
>>> task manager in the web UI.
>>>
>>> The taskexecutor log says the following:
>>> ... (clipped for brevity)
>>> 2019-09-12 15:56:36,625 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> 
>>> 2019-09-12 15:56:36,631 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
>>> UNIX signal handlers for [TERM, HUP, INT]
>>> 2019-09-12 15:56:36,647 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
>>> number of open file descriptors is 1048576.
>>> 2019-09-12 15:56:36,710 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.rpc.address, 150.82.218.218
>>> 2019-09-12 15:56:36,711 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2019-09-12 15:56:36,712 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.heap.size, 1024m
>>> 2019-09-12 15:56:36,713 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.heap.size, 1024m
>>> 2019-09-12 15:56:36,714 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.numberOfTaskSlots, 1
>>> 2019-09-12 15:56:36,715 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: parallelism.default, 1
>>> 2019-09-12 15:56:36,717 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.execution.failover-strategy, region
>>> 2019-09-12 15:56:37,097 INFO  org.apache.flink.core.fs.FileSystem
>>> - Hadoop is not in the classpath/dependencies. The
>>> extended set of supported File Systems via Hadoop is not available.
>>> 2019-09-12 15:56:37,221 INFO
>>>  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
>>> create Hadoop Security Module because Hadoop cannot be found in the
>>> Classpath.
>>> 2019-09-12 15:56:37,305 INFO
>>>  org.apache.flink.runtime.security.SecurityUtils   - Cannot
>>> install HadoopSecurityContext because Hadoop cannot be found in the
>>> Classpath.
>>> 2019-09-12 15:56:38,142 INFO
>>>  org.apache.flink.configuration.Configuration  - Config
>>> uses fallback configuration key 'jobmanager.rpc.address' instead of key
>>> 'rest.address'
>>> 2019-09-12 15:56:38,169 INFO
>>>  org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
>>> select the network interface and address to use by connecting to the
>>> leading JobManager.
>>> 2019-09-12 15:56:38,170 INFO
>>>  org.apache.flink

Re: Joining Pojos

2019-09-17 Thread Zhenghua Gao
POJO is available in KeySelector[1].
Could you provide more information about your problem? Version of Flink?
Error messages?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions

*Best Regards,*
*Zhenghua Gao*


On Mon, Sep 16, 2019 at 11:16 PM Benjamin Wootton <
benjamin.wootton.perso...@gmail.com> wrote:

> Hi All
>
> I'm new to Flink.  I am having a lot of success but I'm struggling with
> Windowed joins over Pojos.
>
> In a toy example I am trying to respond to flight delay events and pull
> some fields from flight details:
>
> flightDelaysStream
> .map(new FlightDelaysMapper())
> .join(flightDetailsStream)
>
> * .where( new FlightDelayKeySelector() ) .equalTo(new MyKeySelector() )*
> .window(TumblingEventTimeWindows.of(Time.seconds(10)))
> .apply(new JF())
> .print();
>
> My problem is in the where and equalTo clauses.  I can't seem to specify a
> Key selector for flight details as the equalTo clause doesn't accept
> anything related to FlightDetails.
>
> All of the examples I can find online are Tuples.  Should this kind of
> thing be possible with Pojos?  Much prefer to stay in the domain objects if
> possible.
>
> Thanks
> Ben
>
>
>
>
>
>
>


Flink help / pairing

2019-09-17 Thread Benjamin Wootton
I am currently building a system based on Flink and could use some ongoing
help.

Is there any expert who would be free for some remote pairing?

Happy to pay for time of course!

Thanks
Ben


Difference between data stream window function and cep within

2019-09-17 Thread Joshua Fan
Hi All,

I'd like to know the difference between data stream window function and cep
within, I googled this issue but found no useful information.

Below the cep within, is there a tumbling window or sliding window or just
a process function?

Your explanation will be truly appreciated.

Yours sincerely

Joshua


Re: Flink help / pairing

2019-09-17 Thread Leonard Xu
Maybe you can find some information here: https://www.ververica.com/training 


Best,
Leonard Xu

> On 2019年9月17日, at 下午7:17, Benjamin Wootton 
>  wrote:
> 
> I am currently building a system based on Flink and could use some ongoing 
> help.
> 
> Is there any expert who would be free for some remote pairing?  
> 
> Happy to pay for time of course!
> 
> Thanks
> Ben
> 



Error "Failed to load native Mesos library from" when I run Flink on a compiled version of Apache Mesos

2019-09-17 Thread Felipe Gutierrez
Hi,

I am compiling mesos from source code and trying to execute Flink on it.
For some reason I am having the error that "Failed to load native Mesos
library from
/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib".
I searched on internet and everybody is saying to load the library paths
like I did below. But the error still remains.

I am able to deploy a dummy task (infinit loop) on mesos and it is running
all right.


flink@cow-11:~/flink-1.9.0$ export
MESOS_NATIVE_JAVA_LIBRARY="/home/felipe/workspace-vsc/mesos/build/src/.libs/libmesos.so"
flink@cow-11:~/flink-1.9.0$
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/felipe/workspace-vsc/mesos/build/src/.libs/
flink@cow-11:~/flink-1.9.0$
PATH=$PATH:/home/felipe/workspace-vsc/mesos/build/bin/

flink-1.9.0$ sudo ./bin/mesos-appmaster.sh
Failed to load native Mesos library from
/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib

Thanks,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: serialization issue in streaming job run with scala Future

2019-09-17 Thread Debasish Ghosh
I think the issue may not be linked with Future. What happens is when this
piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
.keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
.connect(fares)
.flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following
which tries to serialize Avro data. Is there any way to pass the custom
avro serializer that I am using ?

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
serializable. The object probably contains or references non serializable
fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at
org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
at
org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
at
pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

How exactly does Idle-state retention policy work?

2019-09-17 Thread srikanth flink
Hi there,

I'm using FlinkSQL to solve to do the job for me. Based on this
,
configured the idle-state milliseconds.

*context*: FlinkSQL reads Kafka stream with no key and put to dynamic
table( sourceKafka). There's another static table( badips) loaded from file
and the join is performed from dynamic table on static like: SELECT
sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.source.ip=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

As it said in the docs, my query doesn't have a 'groupby' for the
idle-state to act upon and evict the untouched. So how do I manage to evict
the older once?

Questions:

   1. If it happens to be that the 'ip' acts as a key in my query for the
   eviction to work, how does Flink justify the Heap size grew to 80GB and
   crash?
   2. Is is that every query with a time windowed join, Flink SQL will
   automatically clear older records that have become irrelevant?


Thanks
Srikanth


Re: Difference between data stream window function and cep within

2019-09-17 Thread Dian Fu
Hi Joshua,

There is no tumbling/sliding window underlying the cep within implementation. 

The difference between datastream window and cep within is that:
1) Regarding to datastream window, the window is unified for all the elements 
(You can think that the window already exists before the input elements come). 
For example, for sliding window: (window size: 60s, slide size: 10s), then the 
windows will be [0s, 60s], [10s, 70s], [20s, 80s], etc. When the input elements 
come, they are put into the windows they belong to.
2) Regarding to cep within, it defines the maximum time interval for an event 
sequence to match the pattern. So a unified window is not suitable for this 
requirement. Regarding to the underlying implementation, for each 
matching/partial-matching sequence, the time interval between the first element 
and the last element of the sequence will be checked against the within 
interval. You can refer to [1] for details.

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/459fd929399ad6c80535255eefa278564ec33683/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java#L251
 



> 在 2019年9月17日,下午7:47,Joshua Fan  写道:
> 
> Hi All,
> 
> I'd like to know the difference between data stream window function and cep 
> within, I googled this issue but found no useful information.
> 
> Below the cep within, is there a tumbling window or sliding window or just a 
> process function?
> 
> Your explanation will be truly appreciated.
> 
> Yours sincerely
> 
> Joshua



Re: Error "Failed to load native Mesos library from" when I run Flink on a compiled version of Apache Mesos

2019-09-17 Thread Rui Li
Hey Felipe,

I haven't tried to run Flink on Mesos, but I guess you can try exporting
those ENV variables so that Flink can see them. Alternatively you can try
something like

*LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/felipe/workspace-vsc/mesos/build/src/.libs/
./bin/mesos-appmaster.sh*

If the problem persists, please provide detailed stack traces/error
messages which should be found in log files.

On Wed, Sep 18, 2019 at 12:34 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am compiling mesos from source code and trying to execute Flink on it.
> For some reason I am having the error that "Failed to load native Mesos
> library from
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib".
> I searched on internet and everybody is saying to load the library paths
> like I did below. But the error still remains.
>
> I am able to deploy a dummy task (infinit loop) on mesos and it is running
> all right.
>
>
> flink@cow-11:~/flink-1.9.0$ export
> MESOS_NATIVE_JAVA_LIBRARY="/home/felipe/workspace-vsc/mesos/build/src/.libs/libmesos.so"
> flink@cow-11:~/flink-1.9.0$
> LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/felipe/workspace-vsc/mesos/build/src/.libs/
> flink@cow-11:~/flink-1.9.0$
> PATH=$PATH:/home/felipe/workspace-vsc/mesos/build/bin/
>
> flink-1.9.0$ sudo ./bin/mesos-appmaster.sh
> Failed to load native Mesos library from
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


-- 
Best regards!
Rui Li


Re: How exactly does Idle-state retention policy work?

2019-09-17 Thread Dian Fu
Hi Srikanth,

As you side, the idle-state retention time configuration only works for "group 
aggregate" and "over aggregate".

Regarding to your question:
> If it happens to be that the 'ip' acts as a key in my query for the eviction 
> to work, how does Flink justify the Heap size grew to 80GB and crash?
Is you question "how does the eviction work"? If it is, it will register a 
timer for each key and when the state corresponding to some key is not accessed 
for the configured time, the state will be cleared as a callback of the timer. 
The state clear is not triggered by the heap size, but by the timer.

> Is is that every query with a time windowed join, Flink SQL will 
> automatically clear older records that have become irrelevant?
That's right. For time window join, the outdated records will be cleared 
automatically.

Regards,
Dian

> 在 2019年9月18日,上午8:50,srikanth flink  写道:
> 
> Hi there,
> 
> I'm using FlinkSQL to solve to do the job for me. Based on this 
> ,
>  configured the idle-state milliseconds. 
> 
> context: FlinkSQL reads Kafka stream with no key and put to dynamic table( 
> sourceKafka). There's another static table( badips) loaded from file and the 
> join is performed from dynamic table on static like: SELECT sourceKafka.* 
> FROM sourceKafka INNER JOIN badips ON sourceKafka.source.ip=badips.ip WHERE 
> sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' 
> MINUTE AND CURRENT_TIMESTAMP;
> 
> As it said in the docs, my query doesn't have a 'groupby' for the idle-state 
> to act upon and evict the untouched. So how do I manage to evict the older 
> once?
> 
> Questions:
> If it happens to be that the 'ip' acts as a key in my query for the eviction 
> to work, how does Flink justify the Heap size grew to 80GB and crash?
> Is is that every query with a time windowed join, Flink SQL will 
> automatically clear older records that have become irrelevant?
> 
> Thanks
> Srikanth
> 



Re: Window metadata removal

2019-09-17 Thread gil bl
Hi Fabian,  Thank you for your reply. I'm not sure my question was clear enough so I'll try to explain our scenario:We are working in “event time” mode.We want to handle ‘late data’ up to last X days (for example last 7 days)For each incoming event:The event is being aggregated using window function.When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.we are not using the default trigger.We expect the flow above to result in fragmented data, i.e. several outputs with the same  which aggregate different sets of events. We encounter the following problem:Since we have a huge number of different , the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory.Here is a calculation of the required memory consumption only for the window metadata -Metadata size for each  is at least 64 bytes.If we have 200,000,000  per day and the allowed lateness is set to 7 days:200,000,000 * 64 * 7 = ~83GB For the scenario above the window metadata is useless.Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period?(maybe as a new feature 😊?)05.09.2019, 13:04, "Fabian Hueske" :Hi,A window needs to keep the data as long as it expects new data.This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right?In case you expect some data to be late, you can configure allowedLateness. Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40. The data is kept to be able to update the result of the window until allowedLateness has passed. If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data.If you don't need allowedLateness, don't configure it (the default is 0).Best, FabianAm Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl :Hi, I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.What is the purpose of keeping this data if no new events are expected to enter the pane? Is there any way this metadata can be released earlier?