DynamoStreams Consumer millisBehindLatest metric

2019-10-28 Thread Vinay Patil
Hi,

I am currently using FlinkDynamoStreamsConsumer in Production, for
monitoring the lag I am relying on millisBehindLatest metric but this
always returns -1 even if the dynamo stream contains million records
upfront.

Also, it would be great if we can add a documentation mentioning that Flink
supports DynamoStreams

Regards,
Vinay Patil


Re: Issue with writeAsText() to S3 bucket

2019-10-28 Thread Nguyen, Michael
Hi Fabian,

Thank you for the response. So I am currently using .writeAsText() to print out 
9 different datastreams in one Flink job as I am printing my original 
datastream with various filters applied to it. I usually see around 6-7 of my 
datastreams successfully list the JSON file in my S3 bucket upon cancelling my 
Flink job.

Even in my situation, would this still be an issue with S3’s file listing 
command?

Thanks,
Michael

From: Fabian Hueske 
Date: Friday, October 25, 2019 at 6:04 AM
To: Michael Nguyen 
Cc: "user@flink.apache.org" 
Subject: Re: Issue with writeAsText() to S3 bucket

[External]

Hi Michael,

One reason might be that S3's file listing command is only eventually 
consistent.
It might take some time until the file appears and is listed.

Best, Fabian

Am Mi., 23. Okt. 2019 um 22:41 Uhr schrieb Nguyen, Michael 
mailto:michael.nguye...@t-mobile.com>>:
Hello all,

I am running into issues at the moment trying to print my DataStreams to an S3 
bucket using writeAsText(“s3://bucket/result.json”) in my Flink job. I used 
print() on the same DataStream and I see the output I am looking for in 
standard output. I first confirm that my datastream has data by looking at the 
standard output, then I cancel my Flink job. After cancelling the job, 
result.json only gets created in my S3 bucket some of the time. It does not 
always gets created, but I confirmed that I see my data in standard output.

I understand writeAsText() should be used for debugging purposes only according 
to Flink’s documentation, but I’m just curious as to why I can’t get 
writeAsText() to always work every time I cancel my job.

Thank you for your help,
Michael


Complex SQL Queries on Java Streams

2019-10-28 Thread Mohammed Tabreaz
Recently we moved from Oracle to Cassandra. In Oracle we were using advance
analytical functions such as lag, lead and Macth_Recognize heavily.

I was trying to identify equivalent functionality in java, and came across
Apache Flink, however I'm not sure if I should use that library in
stand-alone java applications using their CollectionsEnvironment.

Anyone have ever used Apache Flink for such purpose, what is overhead of
using this library in Web Services.


Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread Nguyen, Michael
Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a 
KeyedDataStream? I have reviewed the testing page on Flink’s official website 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
 and I am not quite sure how I could utilize these two functions in an 
.aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and 
ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream> ec2EventsAggregate =
ec2Events
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(30))
.aggregate(new EventCountAggregate(), new 
CalculateWindowTotal())
.name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and 
correlates it to the number of elements that has been counted so far for the 
window which returns a Tuple2 containg the end timestamp and the count of 
elements.


Thanks,
Michael


Re: Complex SQL Queries on Java Streams

2019-10-28 Thread Jörn Franke
Flink is merely StreamProcessing. I would not use it in a synchronous web call. 
However, I would not make any complex analytic function available on a 
synchronous web service. I would deploy a messaging bus (rabbitmq, zeromq etc) 
and send the request there (if the source is a web app potentially with stomp) 
to receive feedback asynchronously from the function (implemented in Flunk etc) 
the result. In this way you can better control your Computational resources, do 
not block the calling application, can easily recover from connection 
interruptions etc.

> Am 28.10.2019 um 08:11 schrieb Mohammed Tabreaz :
> 
> 
> Recently we moved from Oracle to Cassandra. In Oracle we were using advance 
> analytical functions such as lag, lead and Macth_Recognize heavily.
> 
> I was trying to identify equivalent functionality in java, and came across 
> Apache Flink, however I'm not sure if I should use that library in 
> stand-alone java applications using their CollectionsEnvironment.
> 
> Anyone have ever used Apache Flink for such purpose, what is overhead of 
> using this library in Web Services.


Re: Complex SQL Queries on Java Streams

2019-10-28 Thread Mohammed Tabreaz
Thanks for the feedback, clear about non blocking interfaces.

However, can you clarify or guide me to any other libraries which can be
used with java collections for complex analytics.

On Mon, Oct 28, 2019, 11:29 Jörn Franke  wrote:

> Flink is merely StreamProcessing. I would not use it in a synchronous web
> call. However, I would not make any complex analytic function available on
> a synchronous web service. I would deploy a messaging bus (rabbitmq, zeromq
> etc) and send the request there (if the source is a web app potentially
> with stomp) to receive feedback asynchronously from the function
> (implemented in Flunk etc) the result. In this way you can better control
> your Computational resources, do not block the calling application, can
> easily recover from connection interruptions etc.
>
> Am 28.10.2019 um 08:11 schrieb Mohammed Tabreaz :
>
> 
>
> Recently we moved from Oracle to Cassandra. In Oracle we were using
> advance analytical functions such as lag, lead and Macth_Recognize heavily.
>
> I was trying to identify equivalent functionality in java, and came across
> Apache Flink, however I'm not sure if I should use that library in
> stand-alone java applications using their CollectionsEnvironment.
>
> Anyone have ever used Apache Flink for such purpose, what is overhead of
> using this library in Web Services.
>
>


Re: Cannot modify parallelism (rescale job) more than once

2019-10-28 Thread vino yang
Hi Pankaj,

It seems it is a bug. You can report it by opening a Jira issue.

Best,
Vino

Pankaj Chand  于2019年10月28日周一 上午10:51写道:

> Hello,
>
> I am trying to modify the parallelism of a streaming Flink job (wiki-edits
> example) multiple times on a standalone cluster (one local machine) having
> two TaskManagers with 3 slots each (i.e. 6 slots total). However, the
> "modify" command is only working once (e.g. when I change the parallelism
> from 2 to 4). The second time (e.g. change parallelism to 6 or even back to
> 2), it is giving an error.
>
> I am using Flink 1.8.1 (since I found that the modify parallelism command
> has been removed from v1.9 documentation) and have configured savepoints to
> be written to file:///home/pankaj/flink-checkpoints. The output of the
> first "modify  -p 4" command and second "modify  -p 6"
> command is copied below.
>
> Please tell me how to modify parallelism multiple times at runtime.
>
> Thanks,
>
> Pankaj
>
>
> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
> Modify job 94831ca34951975dbee3335a384ee935.
> Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.
>
> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
> Modify job 94831ca34951975dbee3335a384ee935.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not rescale job
> 94831ca34951975dbee3335a384ee935.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Suspend needs to happen atomically
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor.aroundReceive(Actor.scala:502)
> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.IllegalStateException: Suspend needs to happen
> atomically
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> ... 20 more
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
It seems to be the case. But when I use timeWindow or CEP with
fromCollection, it works well. For example,

```
sEnv.fromCollection(Seq[Long](1, 1002, 2002,
3002)).assignAscendingTimestamps(identity[Long])
.keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
```

prints

```
1
1002
2002
3002
```

How can I implement my KeyedProcessFunction so that it would work as
expected.

Dian Fu  于 2019年10月28日周一 下午2:04写道:

> Hi,
>
> It generates watermark periodically by default in the underlying
> implementation of `assignAscendingTimestamps`. So for your test program,
> the watermark is still not generated yet and I think that's the reason why
> it's Long.MinValue.
>
> Regards,
> Dian
>
> 在 2019年10月28日,上午11:59,杨力  写道:
>
> I'm going to sort elements in a PriorityQueue and set up timers at
> (currentWatermark + 1), following the instructions in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
> .
>
> However, it seems that context.timerService().currentWatermark() always
> returns Long.MinValue and my onTimer will never be called. Here's minimal
> program to reproduce the problem. Am I missing something?
>
> ```
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> sEnv.setParallelism(argOps.parallelism())
> sEnv.fromCollection(Seq[Long](1, 2,
> 3)).assignAscendingTimestamps(identity[Long])
> .process(new ProcessFunction[Long, Long] {
>   override def processElement(i: Long, context: ProcessFunction[Long,
> Long]#Context, collector: Collector[Long]): Unit = {
> collector.collect(context.timerService().currentWatermark())
>   }
> }).print()
> sEnv.execute()
> ```
>
> ```
> -9223372036854775808
> -9223372036854775808
> -9223372036854775808
> ```
>
>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread Dian Fu
Before a program close, it will emit Long.MaxValue as the watermark and that 
watermark will trigger all the windows. This is the reason why your 
`timeWindow` program could work. However, for the first program, you have not 
registered the event time timer(though 
context.timerService.registerEventTimeTimer) and also there is also no onTimer 
logic defined to process it.

> 在 2019年10月28日,下午4:01,杨力  写道:
> 
> It seems to be the case. But when I use timeWindow or CEP with 
> fromCollection, it works well. For example,
> 
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002, 
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
> 
> prints
> 
> ```
> 1
> 1002
> 2002
> 3002
> ```
> 
> How can I implement my KeyedProcessFunction so that it would work as expected.
> 
> Dian Fu mailto:dian0511...@gmail.com>> 于 
> 2019年10月28日周一 下午2:04写道:
> Hi,
> 
> It generates watermark periodically by default in the underlying 
> implementation of `assignAscendingTimestamps`. So for your test program, the 
> watermark is still not generated yet and I think that's the reason why it's 
> Long.MinValue. 
> 
> Regards,
> Dian  
> 
>> 在 2019年10月28日,上午11:59,杨力 > > 写道:
>> 
>> I'm going to sort elements in a PriorityQueue and set up timers at 
>> (currentWatermark + 1), following the instructions in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>>  
>> .
>> 
>> However, it seems that context.timerService().currentWatermark() always 
>> returns Long.MinValue and my onTimer will never be called. Here's minimal 
>> program to reproduce the problem. Am I missing something?
>> 
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2, 
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long, 
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>> 
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>> 
> 



Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread vino yang
Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
of them call `processElementAndEnsureOutput`) as a example.

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676

Best,
Vino

Nguyen, Michael  于2019年10月28日周一 下午3:18写道:

> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream> ec2EventsAggregate =
> ec2Events
> .keyBy(t -> t.f0)
> .timeWindow(Time.*minutes*(30))
> .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
> .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>


Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread Nguyen, Michael
Hi Vino,

This is a great example – thank you!

It looks like I need to instantiate a StreamExecutionEnvironment to order to 
get my OneInputStreamOperator. Would I need to setup a local flinkCluster using 
MiniClusterWithClientResource in order to use StreamExecutionEnvironment?


Best,
Michael


From: vino yang 
Date: Monday, October 28, 2019 at 1:32 AM
To: Michael Nguyen 
Cc: "user@flink.apache.org" 
Subject: Re: Testing AggregateFunction() and ProcessWindowFunction() on 
KeyedDataStream

[External]

Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider 
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or 
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both 
of them call `processElementAndEnsureOutput`) as a example.

[1]: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676

Best,
Vino

Nguyen, Michael 
mailto:michael.nguye...@t-mobile.com>> 
于2019年10月28日周一 下午3:18写道:
Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a 
KeyedDataStream? I have reviewed the testing page on Flink’s official website 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
 and I am not quite sure how I could utilize these two functions in an 
.aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and 
ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream> ec2EventsAggregate =
ec2Events
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(30))
.aggregate(new EventCountAggregate(), new 
CalculateWindowTotal())
.name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and 
correlates it to the number of elements that has been counted so far for the 
window which returns a Tuple2 containg the end timestamp and the count of 
elements.


Thanks,
Michael


PreAggregate operator with timeout trigger

2019-10-28 Thread Felipe Gutierrez
Hi all,

I have my own stream operator which trigger an aggregation based on the
number of items received
(OneInputStreamOperator#processElement(StreamRecord)). However, it is
possible to not trigger my aggregation if my operator does not receive the
max items that have been set. So, I need a timeout trigger.

I am confused if I need to extend Trigger on
MyPreAggregate-AbstractUdfStreamOperator or if I have to put a window as a
parameter of the operator class MyPreAggregate-AbstractUdfStreamOperator. what is the best approach?

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread vino yang
Hi Michael,

>From the WindowTranslationTest, I did not see anything about the
initialization of mini-cluster. Here we are testing operator, it seems
operator test harness has provided the necessary infrastructure.

You can try to see if there is anything missed.

Best,
Vino

Nguyen, Michael  于2019年10月28日周一 下午4:51写道:

> Hi Vino,
>
>
>
> This is a great example – thank you!
>
>
>
> It looks like I need to instantiate a StreamExecutionEnvironment to order
> to get my OneInputStreamOperator. Would I need to setup a local
> flinkCluster using MiniClusterWithClientResource in order to use
> StreamExecutionEnvironment?
>
>
>
>
>
> Best,
>
> Michael
>
>
>
>
>
> *From: *vino yang 
> *Date: *Monday, October 28, 2019 at 1:32 AM
> *To: *Michael Nguyen 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on
> KeyedDataStream
>
>
>
> *[External]*
>
>
>
> Hi Michael,
>
>
>
> You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.
>
>
>
> You can consider
> `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
> `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
> of them call `processElementAndEnsureOutput`) as a example.
>
>
>
> [1]:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676
> 
>
>
>
> Best,
>
> Vino
>
>
>
> Nguyen, Michael  于2019年10月28日周一 下午3:18写道:
>
> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> )
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream> ec2EventsAggregate =
> ec2Events
> .keyBy(t -> t.f0)
> .timeWindow(Time.*minutes*(30))
> .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
> .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread David Anderson
The reason why the watermark is not advancing is that
assignAscendingTimestamps is a periodic watermark generator. This
style of watermark generator is called at regular intervals to create
watermarks -- by default, this is done every 200 msec. With only a
tiny bit of data to process, the job doesn't run long enough for the
watermark generator to ever be called.


On Mon, Oct 28, 2019 at 9:17 AM Dian Fu  wrote:
>
> Before a program close, it will emit Long.MaxValue as the watermark and that 
> watermark will trigger all the windows. This is the reason why your 
> `timeWindow` program could work. However, for the first program, you have not 
> registered the event time timer(though 
> context.timerService.registerEventTimeTimer) and also there is also no 
> onTimer logic defined to process it.
>
> 在 2019年10月28日,下午4:01,杨力  写道:
>
> It seems to be the case. But when I use timeWindow or CEP with 
> fromCollection, it works well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002, 
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as expected.
>
> Dian Fu  于 2019年10月28日周一 下午2:04写道:
>>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying 
>> implementation of `assignAscendingTimestamps`. So for your test program, the 
>> watermark is still not generated yet and I think that's the reason why it's 
>> Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力  写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at 
>> (currentWatermark + 1), following the instructions in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing.
>>
>> However, it seems that context.timerService().currentWatermark() always 
>> returns Long.MinValue and my onTimer will never be called. Here's minimal 
>> program to reproduce the problem. Am I missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2, 
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long, 
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>


Flink 1.8.1 HDFS 2.6.5 issue

2019-10-28 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi,

I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 2.6.5. 
HDFS is enabled with Kerberos+SSL. While writing output to HDFS, facing the 
below exception and job will be failed. Please let me know if any suggestions 
to debug this issue.

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 21 more
Caused by: java.io.IOException: DataStreamer Exception:
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

Regards,
Suchithra


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread 杨力
Thank you for your response. Registering a timer at Long.MaxValue works.
And I have found the mistake in my original code.

When a timer fires and there are elements in the priority queue with
timestamp greater than current watermark, they do not get processed. A new
timer should be registered for these elements. I just forgot theses
unprocessed elements.

Dian Fu  于 2019年10月28日周一 下午4:17写道:

> Before a program close, it will emit Long.MaxValue as the watermark and
> that watermark will trigger all the windows. This is the reason why your
> `timeWindow` program could work. However, for the first program, you have
> not registered the event time timer(though context.timerService.
> registerEventTimeTimer) and also there is also no onTimer logic defined
> to process it.
>
> 在 2019年10月28日,下午4:01,杨力  写道:
>
> It seems to be the case. But when I use timeWindow or CEP with
> fromCollection, it works well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002,
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as
> expected.
>
> Dian Fu  于 2019年10月28日周一 下午2:04写道:
>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying
>> implementation of `assignAscendingTimestamps`. So for your test program,
>> the watermark is still not generated yet and I think that's the reason why
>> it's Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力  写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at
>> (currentWatermark + 1), following the instructions in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>> .
>>
>> However, it seems that context.timerService().currentWatermark() always
>> returns Long.MinValue and my onTimer will never be called. Here's minimal
>> program to reproduce the problem. Am I missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2,
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long,
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>>
>


Re: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-28 Thread Dian Fu
It seems that the CryptoCodec is null from the exception stack trace. This may 
occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
misconfigured. You could change the log level to "DEBUG" and it will show more 
detailed information about why CryptoCodec is null.

> 在 2019年10月28日,下午7:14,V N, Suchithra (Nokia - IN/Bangalore) 
>  写道:
> 
> Hi,
>  
> I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 
> 2.6.5. HDFS is enabled with Kerberos+SSL. While writing output to HDFS, 
> facing the below exception and job will be failed. Please let me know if any 
> suggestions to debug this issue.
>  
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 21 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)
>  
> Regards,
> Suchithra



RE: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-28 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi,
>From debug logs I could see below logs in taskmanager. Please have a look.

org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
org.apache.hadoop.hdfs.DFSClient DataStreamer Exception"}
java.lang.NullPointerException: null
  at 
org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

Regards,
Suchithra

From: Dian Fu 
Sent: Monday, October 28, 2019 5:40 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org
Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue

It seems that the CryptoCodec is null from the exception stack trace. This may 
occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
misconfigured. You could change the log level to "DEBUG" and it will show more 
detailed information about why CryptoCodec is null.

在 2019年10月28日,下午7:14,V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 写道:

Hi,

I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 2.6.5. 
HDFS is enabled with Kerberos+SSL. While writing output to HDFS, facing the 
below exception and job will be failed. Please let me know if any suggestions 
to debug this issue.

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 21 more
Caused by: java.io.IOException: DataStreamer Exception:
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
a

Re: Flink 1.5+ performance in a Java standalone environment

2019-10-28 Thread Jakub Danilewicz
Thanks for your replies.

We use Flink from within a standalone Java 8 application (no Hadoop, no 
clustering), so it's basically boils down to running a simple code like this:

import java.util.*;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.*;
import org.apache.flink.graph.library.CommunityDetection;

public class FlinkTester {
final Random random = new Random(1);
final float density = 3.0F;

public static void main(String[] args) throws Exception {
new FlinkTester().execute(100, 4);
}

private void execute(int numEdges, int parallelism) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
final Graph graph = createGraph(numEdges, env);

final long start = System.currentTimeMillis();
List> vertices = graph.run(new 
CommunityDetection(10, 0.5)).getVertices().collect();
System.out.println(vertices.size() + " vertices processed in " + 
(System.currentTimeMillis()-start)/1000 + " s");
}

private Graph createGraph(int numEdges, 
ExecutionEnvironment env) {
System.out.println("Creating new graph of " + numEdges + " edges...");

final int maxNumVertices = (int)(numEdges/density);
final Map> vertexMap = new 
HashMap<>(maxNumVertices);
final Map> edgeMap = new HashMap<>(numEdges);

while (edgeMap.size() < numEdges) {
long sourceId = random.nextInt(maxNumVertices) + 1;
long targetId = sourceId;
while (targetId == sourceId)
targetId = random.nextInt(maxNumVertices) + 1;

final String edgeKey = sourceId + "#" + targetId;
if (!edgeMap.containsKey(edgeKey)) {
edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
if (!vertexMap.containsKey(sourceId))
vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
if (!vertexMap.containsKey(targetId))
vertexMap.put(targetId, new Vertex<>(targetId, targetId));
}
}

System.out.println(edgeMap.size() + " edges created between " + 
vertexMap.size() + " vertices.");
return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
}
}

No matter what graph algorithm you pick for benchmarking (above it's 
CommunityDetection) the bigger the graph the wider performance gap (and higher 
CPU/memory consumption) you observe when comparing the execution times between 
the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 
1.9.1).

Just run the code yourselves (you may play with the number of edges and 
parallel threads).

Best,

Jakub



Re: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-28 Thread Dian Fu
I guess this is a bug in Hadoop 2.6.5 and has been fixed in Hadoop 2.8.0 [1]. 
You can work around it by explicitly setting the configration 
"hadoop.security.crypto.codec.classes.aes.ctr.nopadding" as 
"org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, 
org.apache.hadoop.crypto.JceAesCtrCryptoCodec".

[1] https://issues.apache.org/jira/browse/HADOOP-11711 

> 在 2019年10月28日,下午8:59,V N, Suchithra (Nokia - IN/Bangalore) 
>  写道:
> 
> Hi,
> From debug logs I could see below logs in taskmanager. Please have a look.
>  
> org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
> org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
> org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
> org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
> org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
> for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
> for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 
> 3des"}
> o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 
> 3des"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
> [auth-conf], negotiated QOP = auth-conf"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
> [auth-conf], negotiated QOP = auth-conf"}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
> CryptoInputStream and CryptoOutputStream."}
> o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
> CryptoInputStream and CryptoOutputStream."}
> o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
> suite configured."}
> o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
> suite configured."}
> org.apache.hadoop.hdfs.DFSClient DataStreamer Exception"}
> java.lang.NullPointerException: null
>   at 
> org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)
>  
> Regards,
> Suchithra
>  
> From: Dian Fu  
> Sent: Monday, October 28, 2019 5:40 PM
> To: V N, Suchithra (Nokia - IN/Bangalore) 
> Cc: user@flink.apache.org
> Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue
>  
> It seems that the CryptoCodec is null from the exception stack trace. This 
> may occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
> misconfigured. You could change the log level to "DEBUG" and it will show 
> more detailed information about why CryptoCodec is null.
>  
> 在 2019年10月28日,下午7:14,V N, Suchithra (Nokia - IN/Bangalore) 
> mailto:suchithra@nokia.com>> 写道:
>  
> Hi,
>  
> I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 
> 2.6.5. HDFS is enabled with Kerberos+SSL. While writing output to HDFS, 
> facing the below exception and job will be failed. Please let me know if any 
> suggestions to debug this issue.
>  
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 21 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream

RE: Flink 1.8.1 HDFS 2.6.5 issue

2019-10-28 Thread V N, Suchithra (Nokia - IN/Bangalore)
Thanks for the information. Without setting such parameter explicitly, is there 
any possibility that it may work intermittently?

From: Dian Fu 
Sent: Tuesday, October 29, 2019 7:12 AM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org
Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue

I guess this is a bug in Hadoop 2.6.5 and has been fixed in Hadoop 2.8.0 [1]. 
You can work around it by explicitly setting the configration 
"hadoop.security.crypto.codec.classes.aes.ctr.nopadding" as 
"org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, 
org.apache.hadoop.crypto.JceAesCtrCryptoCodec".

[1] https://issues.apache.org/jira/browse/HADOOP-11711

在 2019年10月28日,下午8:59,V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 写道:

Hi,
>From debug logs I could see below logs in taskmanager. Please have a look.

org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.ipc.ProtobufRpcEngine Call: addBlock took 372ms"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient pipeline = 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Connecting to datanode 10.76.113.216:1044"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
org.apache.hadoop.hdfs.DFSClient Send buf size 131072"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient SASL client doing encrypted handshake 
for addr = /10.76.113.216, datanodeId = 10.76.113.216:1044"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.SaslDataTransferClient Client using encryption algorithm 3des"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Verifying QOP, requested QOP = 
[auth-conf], negotiated QOP = auth-conf"}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.a.h.h.p.d.sasl.DataTransferSaslUtil Creating IOStreamPair of 
CryptoInputStream and CryptoOutputStream."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
o.apache.hadoop.util.PerformanceAdvisory No crypto codec classes with cipher 
suite configured."}
org.apache.hadoop.hdfs.DFSClient DataStreamer Exception"}
java.lang.NullPointerException: null
  at 
org.apache.hadoop.crypto.CryptoInputStream.(CryptoInputStream.java:132)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.createStreamPair(DataTransferSaslUtil.java:345)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.doSaslHandshake(SaslDataTransferClient.java:489)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.getEncryptedStreams(SaslDataTransferClient.java:298)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.send(SaslDataTransferClient.java:241)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:210)
  at 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.socketSend(SaslDataTransferClient.java:182)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1409)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
  at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

Regards,
Suchithra

From: Dian Fu mailto:dian0511...@gmail.com>>
Sent: Monday, October 28, 2019 5:40 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>>
Cc: user@flink.apache.org
Subject: Re: Flink 1.8.1 HDFS 2.6.5 issue

It seems that the CryptoCodec is null from the exception stack trace. This may 
occur when "hadoop.security.crypto.codec.classes.aes.ctr.nopadding" is 
misconfigured. You could change the log level to "DEBUG" and it will show more 
detailed information about why CryptoCodec is null.

在 2019年10月28日,下午7:14,V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 写道:

Hi,

I am trying to execute Wordcount.jar in Flink 1.8.1 with Hadoop version 2.6.5. 
HDFS is enabled with Kerberos+SSL. While writing output to HDFS, facing the 
below exception and job will be failed. Please let me know if any suggestions 
to debug this issue.

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClust

Add custom fields into Json

2019-10-28 Thread srikanth flink
Hi there,

I'm querying json data and is working fine. I would like to add custom
fields including the query result.
My query looks like: select ROW(`source`),  ROW(`destination`), ROW(`dns`),
organization, cnt from (select (source.`ip`,source.`isInternalIP`) as
source, (destination.`ip`,destination.`isInternalIP`) as destination,
 (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
from dnsTableS) tab1;

While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to the
same output, but the query is throw exceptions:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1,
column 278.
Was expecting one of:
"EXCEPT" ...
"FETCH" ...
"FROM" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ORDER" ...
"MINUS" ...
"UNION" ...
")" ...
"," ...

Could someone help me with this? Thanks

Srikanth