Re: Flink Question

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi Steve,

I’m not sure what you mean by “replacing addSource with CSV String data”. Are 
your Kinesis records CSV and you want to parse them into Events?
If so, you should be able to do that in the provided DeserializationSchema.

Cheers,
Gordon


On 9 November 2018 at 10:54:22 PM, Steve Bistline (srbistline.t...@gmail.com) 
wrote:

I am having problems with the Flink Kinesis adapter. I have some native KCL 
code that works fine. I want to replace the .addSource with the CSV String data 
that is coming in from my KCL code. How can I do that?



// Consume the data
streams from AWS Kinesis stream

DataStream dataStream = env.addSource(new FlinkKinesisConsumer<>(

pt.getRequired("stream"),

new EventSchema(),

kinesisConsumerConfig))

.name("Kinesis Stream Consumer");



Any help would be appreciated

Thanks,

Steve

Re: Using FlinkKinesisConsumer through a proxy

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

I’m pretty sure that this should work with the properties that you provided, 
unless the AWS Kinesis SDK isn’t working as expected.

What I’ve tested is that with those properties, the ClientConfiguration used to 
build the Kinesis client has the proxy domain / host / ports etc. properly set.
And according to [1], this should be enough to configure the constructed 
Kinesis client to connect via the proxy.

Cheers,
Gordon

[1] 
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/section-client-configuration.html


On 7 November 2018 at 1:19:02 AM, Vijay Balakrishnan (bvija...@gmail.com) wrote:

Hi Gordon,
This still didn't work :(

Tried a few combinations with:
kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  + 
"proxyDomain", "...");
            inesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  
+ "proxyHost", "http://.com";);
            kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  
+ "proxyPort", "911");
            kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  
+ "proxyUsername", "...");
            kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  
+ "proxyPassword", "..");
            kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  
+ "nonProxyHosts", "


How does the FlinkKinesisProducer work so seamlessly through a proxy ?
TIA,
Vijay

On Thu, Oct 4, 2018 at 6:41 AM Tzu-Li (Gordon) Tai  wrote:
Hi,

Since Flink 1.5, you should be able to set all available configurations on the 
ClientConfiguration through the consumer Properties (see FLINK-9188 [1]).

The way to do that would be to prefix the configuration you want to set with 
"aws.clientconfig" and add that to the properties, as such:

```
Properties kinesisConsumerProps = new Properties();
kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
...
```

Could you try that out and see if it works for you?

I've also realized that this feature isn't documented very well, and have 
opened a ticket for that [2].

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-9188
[2] https://issues.apache.org/jira/browse/FLINK-10492

On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek  wrote:
Hi,

I'm looping in Gordon and Thomas, they might have some idea about how to 
resolve this.

Best,
Aljoscha

On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:

I have been trying with all variations  to no avail of java 
-Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://... -Dhttps.proxyPort=911 
-Dhttps.proxyUser= -Dhttps.proxyPassword=.. -Dhttp.proxyHost=http://.. 
-Dhttp.proxyPort=911 -Dhttp.proxyUser=... -Dhttp.proxyPassword=... -jar .. 
after looking at the code in com.amazonaws.ClientConfiguration

On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan  wrote:
HI,
How do I use FlinkKinesisConsumer using the Properties through a proxy ? 
Getting a Connection issue through the proxy. 
Works outside the proxy.

Properties kinesisConsumerConfig = new Properties();
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, 
region);

        if (local) {
            
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
accessKey);
            
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
secretKey);
        } else {
            
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");
        }

        //only for Consumer
        
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, 
"1");
        
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 "2000");
        FlinkKinesisConsumer> 
kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new Tuple2KinesisSchema(), 
kinesisConsumerConfig);
TIA



Re: Flink auth against Zookeeper with MD5-Digest

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi,

AFAIK, I don’t think there has been other discussions on this other than the 
original document on secured data access for Flink [1].

Unfortunately, I’m also not knowledgeable enough to comment on how feasible it 
would be to support MD5-Digest for authentication.
Maybe Eron (cc’ed) can chime in here if he has any comments.

Cheers,
Gordon

[1] 
https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing


On 12 November 2018 at 11:41:24 PM, Laura Uzcátegui 
(laura.uzcateg...@gmail.com) wrote:

Hi, 

 I was wondering if there is any plans in the near future to include support 
for another  authentication mechanism different than Kerberos? such as 
MD5-Digest ? 

Cheers, 

Re: Per job cluster doesn't shut down after the job is canceled

2018-11-14 Thread Ufuk Celebi
Hey Paul,

It might be related to this: https://github.com/apache/flink/pull/7004 (see 
linked issue for details).

Best,

Ufuk

> On Nov 14, 2018, at 09:46, Paul Lam  wrote:
> 
> Hi Gary,
> 
> Thanks for your reply and sorry for the delay. The attachment is the 
> jobmanager logs after invoking the cancel command.
> 
> I think it might be related to the custom source, because the jobmanager 
> keeps trying to trigger a checkpoint for it, 
> but in fact it’s already canceled. The source implementation is using a 
> running flag to denote it’s running, and the 
> cancel method is simply setting the flag to false, which I think is a common 
> way of implementing a custom source.
> In addition, the cluster finally shut down because I killed it with yarn 
> commands.
> 
> And also thank you for the pointer, I’ll keep tracking this problem.
> 
> Best,
> Paul Lam
> 
> 
> 
>> 在 2018年11月10日,02:10,Gary Yao  写道:
>> 
>> Hi Paul,
>> 
>> Can you share the complete logs, or at least the logs after invoking the
>> cancel command? 
>> 
>> If you want to debug it yourself, check if
>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how 
>> the
>> jobTerminationFuture is used.
>> 
>> Best,
>> Gary
>> 
>> [1] 
>> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>> 
>> 
>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:
>> Hi, 
>> 
>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN 
>> cluster doesn’t shut down after the job is canceled successfully. The only 
>> errors I found in jobmanager’s log are as below (the second one appears 
>> multiple times):
>> 
>> ```
>> 2018-11-07 09:48:38,663 WARN  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
>> notifying JobStatusListener
>> java.lang.IllegalStateException: Incremented the completed number of 
>> checkpoints without incrementing the in progress checkpoints before.
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>>  at 
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>  at 
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:

Re: Best practice to write data from a stream to non-relational, distributed database (hbase)

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi,

Have you taken a look yet at this [1]? That is an example of writing a stream 
to HBase.

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java


On 11 November 2018 at 5:45:27 PM, Marke Builder (marke.buil...@gmail.com) 
wrote:

Hi,

what is the prefered way to wirte streaming data to hbase?
Rolling File Sink or Streaming File Sink?
How can I configure this (open the connection with conf, and the write 
handling(key,data)?
What do I have to consider about the partitions? I prefer a write pro 
partition. 

Thanks! 
Marke

Re: Last batch of stream data could not be sinked when data comes very slow

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi Henry,

Flushing of buffered data in sinks should occur on two occasions - 1) when some 
buffer size limit is reached or a fixed-flush interval is fired, and 2) on 
checkpoints.

Flushing any pending data before completing a checkpoint ensures the sink has 
at-least-once guarantees, so that should answer your question about data loss.
For data delay due to the buffering, my only suggestion would be to have a 
time-interval based flushing configuration.
That is what is currently happening, for example, in the Kafka / Kinesis 
producer sinks. Records are buffered, and flushed at fixed intervals or when 
the buffer is full. They are also flushed on every checkpoint.

Cheers,
Gordon

On 13 November 2018 at 5:07:32 PM, 徐涛 (happydexu...@gmail.com) wrote:

Hi Experts,
When we implement a sink, usually we implement a batch, according to the record 
number or when reaching a time interval, however this may lead to data of last 
batch do not write to sink. Because it is triggered by the incoming record.
I also test the JDBCOutputFormat provided by flink, and found that it also has 
the same problem. If the batch size is 50, and 49 items arrive, but the last 
one comes in an hour later, then the 49 items will not be written to sink 
during the one hour. This may cause data delay or data loss.
So should any pose a solution to this problem?
Thanks a lot.

Best
Henry 

Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
I don't see the jars of flink connectors in the binary release of flink
1.6.1, so just want to confirm whether flink binary release include these
connectors. Thanks

-- 
Best Regards

Jeff Zhang


Re: Job xxx not found exception when starting Flink program in Local

2018-11-14 Thread Chesnay Schepler
Did you have the WebUI open from a previous execution? If so then the UI 
might still be requesting jobs from the previous job.


On 13.11.2018 08:01, 徐涛 wrote:

Hi Experts,
When I start Flink program in local, I found that the following 
exception throws out, I do not know why it happens because it happens 
in sudden, some hours ago the program can start successfully.

Could anyone help to explain it?
Thanks a lot!

2018-11-13 14:48:45 [flink-akka.actor.default-dispatcher-60] ERROR 
o.a.f.r.r.h.job.JobDetailsHandler - Exception occurred in REST handler.
org.apache.flink.runtime.rest.NotFoundException: Job 
512a21d9f992d4884f836abb82c64f0d not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90) 
~[flink-runtime_2.11-1.6.2.jar:1.6.2]
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 
~[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) 
~[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 
~[na:1.8.0_172]
at 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133) 
~[flink-runtime_2.11-1.6.2.jar:1.6.2]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 
~[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 
[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[na:1.8.0_172]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 
~[na:1.8.0_172]
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) 
~[flink-runtime_2.11-1.6.2.jar:1.6.2]
at akka.dispatch.OnComplete.internal(Future.scala:258) 
~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.OnComplete.internal(Future.scala:256) 
~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) 
~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:32) 
~[scala-library-2.11.8.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) 
~[scala-library-2.11.8.jar:na]
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) 
~[flink-runtime_2.11-1.6.2.jar:1.6.2]
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
~[scala-library-2.11.8.jar:na]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) 
~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 
~[scala-library-2.11.8.jar:na]
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 
~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:32) 
~[scala-library-2.11.8.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) 
~[scala-library-2.11.8.jar:na]
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
~[scala-library-2.11.8.jar:na]
at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 
~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) 
~[akka-actor_2.11-2.4.20.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[scala-library-2.11.8.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[scala-library-2.11.8.jar:na]
at 
scala.

Re: Get savepoint status fails - Flink 1.6.2

2018-11-14 Thread Chesnay Schepler

The documentation you linked only applies if legacy mode is enabled.

The URL you used initially (i.e. 
"/jobs/:jobid/savepoints/:triggerid:triggerid") is correct. My guess is 
that either the JobID or triggerID is not correct.


On 13.11.2018 17:24, PedroMrChaves wrote:

Hello,

I am trying to get the status for a savepoint using the rest api but the GET
request is failing with an error as depicted bellow.

/curl -k
https://localhost:8081/jobs/c78511cf0dc10c1e9f7db17566522d5b/savepoints/51c174eab1efd2c1354282f52f37fadb
{"errors":["Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@848b6c1c"]}
/

Additionally, the commands:
/GET request to /jobs/:jobid/cancel-with-savepoint/
GEt request to /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId/

no longer work, but they are mentioned in the docs
(https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#job-cancellation)

How am I able to monitor the savepoints in version  1.6.2?


Best Regards,
Pedro.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink 1.7 RC missing flink-scala-shell jar

2018-11-14 Thread Chesnay Schepler
This is intended. Increasing the scala version basically broke the 
scala-shell and we haven't had the time to fix it. It is thus only 
available with scala 2.11. I agree that the error message could be 
better though.


On 14.11.2018 03:44, Hao Sun wrote:
I do not see flink-scala-shell jar under flink opt directory. To run 
scala shell, do I have to include the flink-scala-shell jar in my 
program jar?
Why the error is saying Could not find or load main class 
org.apache.flink.api.scala.FlinkShell


On Tue, Nov 13, 2018 at 4:48 PM Tzu-Li Chen > wrote:


Hi,

Till is the release manager for 1.7, so ping him here.

Best,
tison.


Hao Sun mailto:ha...@zendesk.com>>
于2018年11月14日周三 上午3:07写道:

Sorry I mean the scala-2.12 version is missing

On Tue, Nov 13, 2018 at 10:58 AM Hao Sun mailto:ha...@zendesk.com>> wrote:

I can not find the jar here:

https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/

Here is the error:
bash-4.4# ./bin/start-scala-shell.sh local
Error: Could not find or load main class
org.apache.flink.api.scala.FlinkShell

I think somehow I have to include the flink-scala-shell
jar under flink lib.

Any help will be appreciated.





Re: Are flink connectors included in the binary release ?

2018-11-14 Thread Chesnay Schepler
Connectors are never contained in binary releases as they are supposed t 
be packaged into the user-jar.


On 14.11.2018 10:12, Jeff Zhang wrote:


I don't see the jars of flink connectors in the binary release of 
flink 1.6.1, so just want to confirm whether flink binary release 
include these connectors. Thanks


--
Best Regards

Jeff Zhang





Re: Per job cluster doesn't shut down after the job is canceled

2018-11-14 Thread Paul Lam
Hi Ufuk,

Thanks for you reply!

I’m afraid that my case is different. Since the Flink on YARN application is 
not exited, we do not 
have an application exit code yet (but the job status is determined). 

Best,
Paul Lam


> 在 2018年11月14日,16:49,Ufuk Celebi  写道:
> 
> Hey Paul,
> 
> It might be related to this: https://github.com/apache/flink/pull/7004 (see 
> linked issue for details).
> 
> Best,
> 
> Ufuk
> 
>> On Nov 14, 2018, at 09:46, Paul Lam  wrote:
>> 
>> Hi Gary,
>> 
>> Thanks for your reply and sorry for the delay. The attachment is the 
>> jobmanager logs after invoking the cancel command.
>> 
>> I think it might be related to the custom source, because the jobmanager 
>> keeps trying to trigger a checkpoint for it, 
>> but in fact it’s already canceled. The source implementation is using a 
>> running flag to denote it’s running, and the 
>> cancel method is simply setting the flag to false, which I think is a common 
>> way of implementing a custom source.
>> In addition, the cluster finally shut down because I killed it with yarn 
>> commands.
>> 
>> And also thank you for the pointer, I’ll keep tracking this problem.
>> 
>> Best,
>> Paul Lam
>> 
>> 
>> 
>>> 在 2018年11月10日,02:10,Gary Yao  写道:
>>> 
>>> Hi Paul,
>>> 
>>> Can you share the complete logs, or at least the logs after invoking the
>>> cancel command? 
>>> 
>>> If you want to debug it yourself, check if
>>> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how 
>>> the
>>> jobTerminationFuture is used.
>>> 
>>> Best,
>>> Gary
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>>> 
>>> 
>>> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:
>>> Hi, 
>>> 
>>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN 
>>> cluster doesn’t shut down after the job is canceled successfully. The only 
>>> errors I found in jobmanager’s log are as below (the second one appears 
>>> multiple times):
>>> 
>>> ```
>>> 2018-11-07 09:48:38,663 WARN  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
>>> notifying JobStatusListener
>>> java.lang.IllegalStateException: Incremented the completed number of 
>>> checkpoints without incrementing the in progress checkpoints before.
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>>> at 
>>> org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>>> at 
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>>> at 
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.ap

Re: Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
Thanks Chesnay, but if user want to use connectors in scala shell, they
have to download it.

On Wed, Nov 14, 2018 at 5:22 PM Chesnay Schepler  wrote:

> Connectors are never contained in binary releases as they are supposed t
> be packaged into the user-jar.
>
> On 14.11.2018 10:12, Jeff Zhang wrote:
>
>
> I don't see the jars of flink connectors in the binary release of flink
> 1.6.1, so just want to confirm whether flink binary release include these
> connectors. Thanks
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Partitioning by composite key, But type and number of keys are dynamic

2018-11-14 Thread Gaurav Luthra
There is a data stream of some records, Lets call them "input records".
Now, I want to partition this data stream by using keyBy(). I want
partitioning based on one or more fields of "input record", But the number
and type of fields are not fixed.
So, Kindly tell me how should I achieve this partitioning based on "input
records" mentioned above?

Note: Technically, I am using Avro's GenericRecord as "input records". Means
I am using DataStream, which needs to be partitioned. And its
schema can be different for different jobs. So, I do not know the field
names and types to be provided in keyBy().



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-14 Thread Jayant Ameta
I tried to create a sample project, but couldn't reproduce the error! It
was working fine.
Turns out I was using wrong Tuple2 package in my client code :(
After fixing it, the code worked fine.

Thanks Till and Jiayi for your help!

Jayant Ameta


On Tue, Nov 13, 2018 at 4:01 PM Till Rohrmann  wrote:

> Hi Jayant,
>
> could you maybe setup a small Github project with the client and server
> code? Otherwise it is really hard to reproduce the problem. Thanks a lot!
>
> Cheers,
> Till
>
> On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta 
> wrote:
>
>> Getting the same error even when I added flink-avro dependency to the
>> client.
>>
>> Jayant Ameta
>>
>>
>> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy  wrote:
>>
>>> Hi Jayant,
>>>
>>>I don’t know why flink uses the Avro serializer, which is usually
>>> used in POJO class, but from the error messages, I think you can add
>>> flink-avro as a dependency and try again.
>>>
>>>
>>> Best,
>>>
>>> Jiayi Liao
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* trohrmann; Tzu-Li (Gordon) Tai<
>>> tzuli...@apache.org>; user
>>> *Date:* Tuesday, Nov 13, 2018 16:15
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Thanks Jiayi,
>>> I updated the client code to use keyed stream key. The key is a
>>> Tuple2
>>>
>>> CompletableFuture> resultFuture =
>>> 
>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>>> "rules",
>>> Tuple2.of(uuid, "test"), TypeInformation.of(new 
>>> TypeHint>() {
>>> }), descriptor);
>>>
>>> I'm now getting a different exception. I'm NOT using Avro as a customer 
>>> serializer. Not sure what causes this issue.
>>>
>>> Caused by: java.lang.RuntimeException: Error while processing request with 
>>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
>>> required Avro dependency.
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at 
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>> at 
>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>> at 
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>> at 
>>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>> at 
>>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>   

Re: How flink table api to join with mysql dimtable

2018-11-14 Thread Hequn Cheng
Hi yelun,

Currently, there are no direct ways to dynamically load data and do join in
Flink-SQL, as a workaround you can implement your logic with an udtf. In
the udtf, you can load the data into a cache and update it according to
your requirement.

Best, Hequn

On Wed, Nov 14, 2018 at 10:34 AM yelun <986463...@qq.com> wrote:

> hi,
>
> I want to use flink sql to left join static dimension table from mysql
> currently, so I converted the mysql table into data stream to join with
> datastream which has converted to flink table. While I found that the
> real-time stream data is not joined correctly with mysql data  at the
> beginning, but the latter stream can be joined correctly. So I want to ask
> that is there any good way to make real-time stream can join with mysql
> data with table api which has loaded and supporting dynamicly loading mysql
> data into memory once each hour. Thanks a lot.
>
> The following is the some example code:
>
> public static JDBCInputFormatBuilder inputBuilder =
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername(DRIVER_CLASS)
> .setDBUrl(DB_URL)
> .setUsername(USER_NAME)
> .setPassword(USER_PASS)
> .setQuery(SELECT_ALL_PERSONS)
> .setRowTypeInfo(ROW_TYPE_INFO);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);
>
> DataStream orderA = env.addSource(new OrderFunction());
> tEnv.registerDataStream("tableA", orderA, "name, product, amount");
>
> DataStream mysql_table = env.createInput(inputBuilder.finish());
> String[] dim_table_fileds = {"id","name","age","address"};
>
> tEnv.registerDataStream("tableB",mysql_table);
> Table result = tEnv.sqlQuery("SELECT
> tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join
> tableA on tableA.name = tableB.name" );
> tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
> env.execute();
>
> Thanks a lot.
>


Re: Flink 1.7 RC missing flink-scala-shell jar

2018-11-14 Thread Hao Sun
Ok, thanks.

On Wed, Nov 14, 2018, 01:22 Chesnay Schepler  wrote:

> This is intended. Increasing the scala version basically broke the
> scala-shell and we haven't had the time to fix it. It is thus only
> available with scala 2.11. I agree that the error message could be better
> though.
>
>
> On 14.11.2018 03:44, Hao Sun wrote:
>
> I do not see flink-scala-shell jar under flink opt directory. To run
> scala shell, do I have to include the flink-scala-shell jar in my program
> jar?
> Why the error is saying Could not find or load main class
> org.apache.flink.api.scala.FlinkShell
>
> On Tue, Nov 13, 2018 at 4:48 PM Tzu-Li Chen  wrote:
>
>> Hi,
>>
>> Till is the release manager for 1.7, so ping him here.
>>
>> Best,
>> tison.
>>
>>
>> Hao Sun  于2018年11月14日周三 上午3:07写道:
>>
>>> Sorry I mean the scala-2.12 version is missing
>>>
>>> On Tue, Nov 13, 2018 at 10:58 AM Hao Sun  wrote:
>>>
 I can not find the jar here:

 https://repository.apache.org/content/repositories/orgapacheflink-1191/org/apache/flink/
 

 Here is the error:
 bash-4.4# ./bin/start-scala-shell.sh local
 Error: Could not find or load main class
 org.apache.flink.api.scala.FlinkShell

 I think somehow I have to include the flink-scala-shell jar under flink
 lib.

 Any help will be appreciated.

>>>
>


flink build error

2018-11-14 Thread Radu Tudoran
Hi,

I am trying to build flink 1.6 but cannot build it to run also the tests. Any 
ideas of why the surefire error to run junits tests?

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on 
project flink-test-utils-junit: ExecutionException: java.lang.RuntimeException: 
The forked VM terminated without properly saying goodbye. VM crash or 
System.exit called?
[ERROR] Command was /bin/sh -c cd 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit
 && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
-Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefirebooter9129098759083326906.jar
 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire1374644174200907236tmp
 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire_0656703638108827545tmp

Thanks


Re: flink build error

2018-11-14 Thread Alexey Trenikhun
Hello,
It sounds like surefire problem with latest Java: 
https://issues.apache.org/jira/browse/SUREFIRE-1588

Alexey

From: Radu Tudoran 
Sent: Wednesday, November 14, 2018 6:47 AM
To: user
Subject: flink build error

Hi,

I am trying to build flink 1.6 but cannot build it to run also the tests. Any 
ideas of why the surefire error to run junits tests?

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on 
project flink-test-utils-junit: ExecutionException: java.lang.RuntimeException: 
The forked VM terminated without properly saying goodbye. VM crash or 
System.exit called?
[ERROR] Command was /bin/sh -c cd 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit
 && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
-Dmvn.forkNumber=2 -XX:+UseG1GC -jar 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefirebooter9129098759083326906.jar
 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire1374644174200907236tmp
 
/home/rtudoran/git/flink16Release/flink/flink-test-utils-parent/flink-test-utils-junit/target/surefire/surefire_0656703638108827545tmp

Thanks


Re: What if not to keep containers across attempts in HA setup?(Internet mail)

2018-11-14 Thread Paul Lam
Hi Devin,

Thanks for the pointer and it works!

But I have no permission to change the YARN conf in production environment by 
myself and it would need an detailed 
investigation of the Hadoop team to apply the new conf, so I’m still interested 
in the difference between keeping and 
not keeping containers across application attempts.

Best,
Paul Lam


> 在 2018年11月13日,17:27,devinduan(段丁瑞)  写道:
> 
> Hi Paul,
> Could you check out your YARN property  
> "yarn.resourcemanager.work-preserving-recovery.enabled"?
> if value is false, set true and try it again.
> Best,
> Devin
>  
> 发件人: Paul Lam 
> 发送时间: 2018-11-13 12:55
> 收件人: Flink ML 
> 主题: What if not to keep containers across attempts in HA setup?(Internet mail)
> Hi,
> 
> Recently I found a bug on our YARN cluster that crashes the standby RM during 
> a RM failover, and 
> the bug is triggered by the keeping containers across attempts behavior of 
> applications (see [1], a related 
> issue but the patch is not exactly the fix, because the problem is not on 
> recovery, but the attempt after 
> the recovery).
> 
> Since YARN is a fundamental component and a maintenance of it would affect a 
> lot users, as a last resort
> I wonder if we could modify YarnClusterDescriptor and not to keep containers 
> across attempts. 
> 
> IMHO, Flink application’s state is not dependent on YARN, so there is no 
> state that must be recovered 
> from the previous application attempt. In case of a application master 
> failure, the taskmanagers can be 
> shutdown and the cost is longer recovery time.
> 
> Please correct me if I’m wrong. Thank you!
> 
> [1]https://issues.apache.org/jira/browse/YARN-2823 
> 
> 
> Best,
> Paul Lam



Could not find previous entry with key.

2018-11-14 Thread Steve Bistline
Any thoughts on where to start with this error would be appreciated.

Caused by: java.lang.IllegalStateException: Could not find previous
entry with key: first event, value:
{"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018
02:29:30.343 
am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
and timestamp: 1542248971585. This can indicate that either you did
not implement the equals() and hashCode() methods of your input
elements properly or that the element belonging to that entry has been
already pruned.


=

CODE HERE

=

//kinesisConsumerConfig.list(System.out);

   // Consume the data streams from AWS Kinesis stream
   DataStream dataStream = env.addSource(new FlinkKinesisConsumer<>(
   pt.getRequired("stream"),
   new EventSchema(),
   kinesisConsumerConfig))
   .name("Kinesis Stream Consumer");

   System.out.printf("Print dataStream\n");
//dataStream.print();

   DataStream kinesisStream = dataStream
   .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
   .map(event -> (IoTEvent) event);

   // Prints the mapped records from the Kinesis stream

   //kinesisStream.print();
//System.out.printf("Print kinesisStream\n");



   Pattern pattern = Pattern
   . begin("first event").subtype(IoTEvent.class)
   .where(new IterativeCondition()
   {
   private static final long serialVersionUID = -6301755149429716724L;

   @Override
   public boolean filter(IoTEvent value, Context
ctx) throws Exception {
   PatternConstants.MOTION_FIRST = value.getMotionDir();
   return value.getMotionDir() != PatternConstants.MOTION_NA;
   }
   })
   .next("second")
   .subtype(IoTEvent.class)
   .where(new IterativeCondition() {
   private static final long serialVersionUID =
2392863109523984059L;

   @Override
   public boolean filter(IoTEvent value,
Context ctx) throws Exception {

   return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() !=
PatternConstants.MOTION_FIRST;
   }
   })
   .next("third")
   .subtype(IoTEvent.class)
   .where(new IterativeCondition() {
   private static final long serialVersionUID =
2392863109523984059L;

   @Override
   public boolean filter(IoTEvent value,
Context ctx) throws Exception {

   return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() ==
PatternConstants.MOTION_FIRST;
   }
   })
   .next("fourth")
   .subtype(IoTEvent.class)
   .where(new IterativeCondition() {
   private static final long serialVersionUID =
2392863109523984059L;

   @Override
   public boolean filter(IoTEvent value,
Context ctx) throws Exception {

   return value.getMotionDir() !=
PatternConstants.MOTION_NA && value.getMotionDir() !=
PatternConstants.MOTION_FIRST;
   }
   })
   .within(Time.seconds(10));


Join Dataset in stream

2018-11-14 Thread eric hoffmann
Hi.
I need to compute an euclidian distance between an input Vector and a full
dataset stored in Cassandra and keep the n lowest value. The Cassandra
dataset is evolving (mutable). I could do this on a batch job, but i will
have to triger it each time and the input are more like a slow stream, but
the computing need to be fast can i do this on a stream way? is there any
better solution ?
Thx