writeAsCSV with partitionBy

2016-02-12 Thread Srikanth
Hello,



Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?

I'm looking to save output as CSV files partitioned by two columns(date and
hour).

The partitionBy dataset API is more to partition the data based on a column
for further processing.



I'm thinking there is no direct API to do this. But what will be the best
way of achieving this?



Srikanth


Re: writeAsCSV with partitionBy

2016-02-16 Thread Srikanth
Fabian,

Not sure if we are on the same page. If I do something like below code, it
will groupby field 0 and each task will write a separate part file in
parallel.

val sink = data1.join(data2)
.where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) }
.partitionByHash(0)
.writeAsCsv(pathBase + "output/test", rowDelimiter="\n",
fieldDelimiter="\t" , WriteMode.OVERWRITE)

This will create folder ./output/test/<1,2,3,4...>

But what I was looking for is Hive style partitionBy that will output with
folder structure

   ./output/field0=1/file
   ./output/field0=2/file
   ./output/field0=3/file
   ./output/field0=4/file

Assuming field0 is Int and has unique values 1,2,3&4.

Srikanth


On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske  wrote:

> Hi Srikanth,
>
> DataSet.partitionBy() will partition the data on the declared partition
> fields.
> If you append a DataSink with the same parallelism as the partition
> operator, the data will be written out with the defined partitioning.
> It should be possible to achieve the behavior you described using
> DataSet.partitionByHash() or partitionByRange().
>
> Best, Fabian
>
>
> 2016-02-12 20:53 GMT+01:00 Srikanth :
>
>> Hello,
>>
>>
>>
>> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?
>>
>> I'm looking to save output as CSV files partitioned by two columns(date
>> and hour).
>>
>> The partitionBy dataset API is more to partition the data based on a
>> column for further processing.
>>
>>
>>
>> I'm thinking there is no direct API to do this. But what will be the best
>> way of achieving this?
>>
>>
>>
>> Srikanth
>>
>>
>>
>
>


Join DataStream with dimension tables?

2016-04-20 Thread Srikanth
Hello,

I have a fairly typical streaming use case but not able to figure how to
implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension
tables which are saved as flat files.

As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its not
possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash
and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth


Re: Join DataStream with dimension tables?

2016-04-25 Thread Srikanth
Aljoscha,

Looks like a potential solution. Feels a bit hacky though.

Didn't quite understand why a list backed store is used to for static input
buffer? Join(inner) should emit only one record if there is a key match.

Is it a property of the system to emit Long.MAX_VALUE watermark when a
finite stream source ends?
If so can I do something like this to read static file in parallel?
val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)

Shouldn't we also override checkpoint handling of custom operator? If so,
should the checkpoint wait/fail during the initial read phase?

Lohith,
Adding a component like Cassandra just for this feels like a overkill. But
if I can't find a suitable way to do this, I might use it( or Redis
probably).

Srikanth



On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M  wrote:

> Hi,
> Cassandra could be used as a distributed cache.
>
> Lohith.
>
> Sent from my Sony Xperia™ smartphone
>
>
>  Aljoscha Krettek wrote 
>
>
> Hi Srikanth,
> that's an interesting use case. It's not possible to do something like
> this out-of-box but I'm actually working on API for such cases.
>
> In the mean time, I programmed a short example that shows how something
> like this can be programmed using the API that is currently available. It
> requires writing a custom operator but it is still somewhat succinct:
> https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906
>
> Please let me know if you have any questions.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 03:06 Srikanth  wrote:
>
>> Hello,
>>
>> I have a fairly typical streaming use case but not able to figure how to
>> implement it best in Flink.
>> I want to join records read from a kafka stream with one(or more)
>> dimension tables which are saved as flat files.
>>
>> As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its
>> not possible to join DataStream with DataSet.
>> These tables are too big to do a collect() and join.
>>
>> It will be good to read these files during startup, do a partitionByHash
>> and keep it cached.
>> On the DataStream may be do a keyBy and join.
>> Is something like this possible?
>>
>> Srikanth
>>
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>


Re: Join DataStream with dimension tables?

2016-04-27 Thread Srikanth
Aljoscha,

Your thoughts on this?

Srikanth

On Mon, Apr 25, 2016 at 8:08 PM, Srikanth  wrote:

> Aljoscha,
>
> Looks like a potential solution. Feels a bit hacky though.
>
> Didn't quite understand why a list backed store is used to for static
> input buffer? Join(inner) should emit only one record if there is a key
> match.
>
> Is it a property of the system to emit Long.MAX_VALUE watermark when a
> finite stream source ends?
> If so can I do something like this to read static file in parallel?
> val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)
>
> Shouldn't we also override checkpoint handling of custom operator? If so,
> should the checkpoint wait/fail during the initial read phase?
>
> Lohith,
> Adding a component like Cassandra just for this feels like a overkill. But
> if I can't find a suitable way to do this, I might use it( or Redis
> probably).
>
> Srikanth
>
>
>
> On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M <
> lohith.sam...@mphasis.com> wrote:
>
>> Hi,
>> Cassandra could be used as a distributed cache.
>>
>> Lohith.
>>
>> Sent from my Sony Xperia™ smartphone
>>
>>
>>  Aljoscha Krettek wrote 
>>
>>
>> Hi Srikanth,
>> that's an interesting use case. It's not possible to do something like
>> this out-of-box but I'm actually working on API for such cases.
>>
>> In the mean time, I programmed a short example that shows how something
>> like this can be programmed using the API that is currently available. It
>> requires writing a custom operator but it is still somewhat succinct:
>> https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906
>>
>> Please let me know if you have any questions.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 03:06 Srikanth  wrote:
>>
>>> Hello,
>>>
>>> I have a fairly typical streaming use case but not able to figure how to
>>> implement it best in Flink.
>>> I want to join records read from a kafka stream with one(or more)
>>> dimension tables which are saved as flat files.
>>>
>>> As per this jira <https://issues.apache.org/jira/browse/FLINK-2320> its
>>> not possible to join DataStream with DataSet.
>>> These tables are too big to do a collect() and join.
>>>
>>> It will be good to read these files during startup, do a partitionByHash
>>> and keep it cached.
>>> On the DataStream may be do a keyBy and join.
>>> Is something like this possible?
>>>
>>> Srikanth
>>>
>>
>> Information transmitted by this e-mail is proprietary to Mphasis, its
>> associated companies and/ or its customers and is intended
>> for use only by the individual or entity to which it is addressed, and
>> may contain information that is privileged, confidential or
>> exempt from disclosure under applicable law. If you are not the intended
>> recipient or it appears that this mail has been forwarded
>> to you without proper authority, you are notified that any use or
>> dissemination of this information in any manner is strictly
>> prohibited. In such cases, please notify us immediately at
>> mailmas...@mphasis.com and delete this mail from your records.
>>
>
>


Scala compilation error

2016-05-02 Thread Srikanth
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] =
env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
(Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
{}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)


Re: Scala compilation error

2016-05-02 Thread Srikanth
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and
Java API. I was refering to javadoc. Is there a scaladoc?

Java API has
public  SingleOutputStreamOperator transform(
String functionName,
TypeInformation outTypeInfo,
TwoInputStreamOperator operator)

Scala API has
def transform[R: TypeInformation](
  functionName: String,
  operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth  wrote:

> Hello,
>
> I'm fac
>
> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
> new SimpleStringSchema(), properties))
> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
> BidderRawLogs(b)).keyBy(b => b.strategyId)
>
> val metaStrategy: KeyedStream[(Int, String), Int] =
> env.readTextFile("path").name("Strategy")
>  .map((1, _) ).keyBy(_._1)
>
> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
> (Int, BidderRawLogs, (Int, String))] =
>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
> staticTypeInfo)
> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
> {}.getTypeInfo()
>
> val funName = "test"
> val joinedStream = bidderStream.connect(metaStrategy)
> .transform(funName, joinOperator, outTypeInfo)
>
>


Re: Scala compilation error

2016-05-03 Thread Srikanth
Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not
clear.

Thanks

On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek 
wrote:

> There is a Scaladoc but it is not covering all packages, unfortunately. In
> the Scala API you can call transform without specifying a TypeInformation,
> it works using implicits/context bounds.
>
> On Tue, 3 May 2016 at 01:48 Srikanth  wrote:
>
>> Sorry for the previous incomplete email. Didn't realize I hit send!
>>
>> I was facing a weird compilation error in Scala when I did
>> val joinedStream = stream1.connect(stream2)
>> .transform("funName", outTypeInfo, joinOperator)
>>
>> It turned out to be due to a difference in API signature between Scala
>> and Java API. I was refering to javadoc. Is there a scaladoc?
>>
>> Java API has
>> public  SingleOutputStreamOperator transform(
>> String functionName,
>> TypeInformation outTypeInfo,
>> TwoInputStreamOperator operator)
>>
>> Scala API has
>> def transform[R: TypeInformation](
>>   functionName: String,
>>   operator: TwoInputStreamOperator[IN1, IN2, R])
>>
>> Srikanth
>>
>> On Mon, May 2, 2016 at 7:18 PM, Srikanth  wrote:
>>
>>> Hello,
>>>
>>> I'm fac
>>>
>>> val stream = env.addSource(new
>>> FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(),
>>> properties))
>>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>>
>>> val metaStrategy: KeyedStream[(Int, String), Int] =
>>> env.readTextFile("path").name("Strategy")
>>>  .map((1, _) ).keyBy(_._1)
>>>
>>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>>> (Int, BidderRawLogs, (Int, String))] =
>>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>>> staticTypeInfo)
>>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>>> {}.getTypeInfo()
>>>
>>> val funName = "test"
>>> val joinedStream = bidderStream.connect(metaStrategy)
>>> .transform(funName, joinOperator, outTypeInfo)
>>>
>>>
>>


Re: how to convert datastream to collection

2016-05-03 Thread Srikanth
Why do you want collect and iterate? Why not iterate on the DataStream
itself?
May be I didn't understand your use case completely.

Srikanth

On Tue, May 3, 2016 at 10:55 AM, Aljoscha Krettek 
wrote:

> Hi,
> please keep in mind that we're dealing with streams. The Iterator might
> never finish.
>
> Cheers,
> Aljoscha
>
> On Tue, 3 May 2016 at 16:35 Suneel Marthi  wrote:
>
>> DataStream> *newCentroids = new DataStream<>.()*
>>
>> *Iterator> iter =
>> DataStreamUtils.collect(newCentroids);*
>>
>> *List> list = Lists.newArrayList(iter);*
>>
>> On Tue, May 3, 2016 at 10:26 AM, subash basnet 
>> wrote:
>>
>>> Hello all,
>>>
>>> Suppose I have the datastream as:
>>> DataStream> *newCentroids*;
>>>
>>> How to get collection of *newCentroids * to be able to loop as below:
>>>  private Collection> *centroids*;
>>>  for (Centroid cent : *centroids*) {
>>>   }
>>>
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>
>>


Force triggering events on watermark

2016-05-10 Thread Srikanth
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to
overwrite the default Trigger provided by the WindowAssigner. Note that
specifying a triggers does not add an additional trigger condition but
replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding
this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
   .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only
when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the
window? I.e, I want to use triggers to start early processing but finalize
the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth


Re: Force triggering events on watermark

2016-05-10 Thread Srikanth
Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
   i) onElement - Register for event time watermark but not alter nested
trigger's TriggerResult
  ii) OnEventTime - Always purge after fire

That will work with CountTrigger and other custom trigger too rt?

public class FinalizePurgingTrigger  extends
Trigger {

@Override
public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}

Srikanth

On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske  wrote:

> Maybe the last example of this blog post is helpful [1].
>
> Best, Fabian
>
> [1]
> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>
> 2016-05-10 17:24 GMT+02:00 Srikanth :
>
>> Hi,
>>
>> I read the following in Flink doc "We can explicitly specify a Trigger to
>> overwrite the default Trigger provided by the WindowAssigner. Note that
>> specifying a triggers does not add an additional trigger condition but
>> replaces the current trigger."
>> So, I tested out the below code with count trigger. As per my
>> understanding this will override the default watermark based trigger.
>>
>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
>> 4),
>>  ("2016-04-07 13:11:59", 157428, 4),
>>  ("2016-04-07 13:11:59", 111283, 23),
>>  ("2016-04-07 13:11:57", 108042, 23),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:11:59", 136505, 4)
>> )
>> )
>>.assignAscendingTimestamps(b => f.parse(b._1).getTime())
>>.map(b => (b._3, b._2))
>>
>> testStream.print
>>
>> val countStream = testStream
>> .keyBy(_._1)
>> .timeWindow(Time.seconds(20))
>> .trigger(CountTrigger.of(3))
>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>
>> countStream.print
>>
>> Output I saw confirms the documented behavior. Processing is triggered
>> only when we have 3 elements for a key.
>> How do I force trigger the left over records when watermark is past the
>> window? I.e, I want to use triggers to start early processing but finalize
>> the window based on watermark.
>>
>> Output shows that records for keys 23 & 9 weren't processed.
>>   (4,157428)
>>   (4,157428)
>>   (23,111283)
>>   (23,108042)
>>   (9,161374)
>>   (9,161374)
>>   (4,136505)
>>
>>   (4,List(157428, 157428, 136505))
>>
>> Thanks,
>> Srikanth
>>
>
>


Barriers at work

2016-05-12 Thread Srikanth
Hello,

I was reading about Flink's checkpoint and wanted to check if I correctly
understood the usage of barriers for exactly once processing.
 1) Operator does alignment by buffering records coming after a barrier
until it receives barrier from all upstream operators instances.
 2) Barrier is always preceded by a watermark to trigger processing all
windows that are complete.
 3) Records in windows that are not triggered are also saved as part of
checkpoint. These windows are repopulated when restoring from checkpoints.

In production setups, were there any cases where alignment during
checkpointing caused unacceptable latency?
If so, is there a way to indicate say wait for a MAX 100 ms? That way we
have exactly-once in most situations but prefer at least once over higher
latency in corner cases.

Srikanth


Re: Barriers at work

2016-05-13 Thread Srikanth
Thanks Matthias & Stephan!

Yes, if we choose to fail checkpoint on expiry, we can restore from
previous checkpoint.

Looking forward to read the new design proposal.

Srikanth


On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:

> Hi Srikanth!
>
> That is an interesting idea.
> I have it on my mind to create a design doc for checkpointing
> improvements. That could be added as a proposal there.
>
> I hope I'll be able to start with that design doc next week.
>
> Greetings,
> Stephan
>
>
> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax  wrote:
>
>> I don't think barries can "expire" as of now. Might be a nice idea
>> thought -- I don't know if this might be a problem in production.
>>
>> Furthermore, I want to point out, that an "expiring checkpoint" would
>> not break exactly-once processing, as the latest successful checkpoint
>> can always be used to recover correctly. Only the recovery-time would be
>> increase. because if a "barrier expires" and no checkpoint can be
>> stored, more data has to be replayed using the "old" checkpoint".
>>
>>
>> -Matthias
>>
>> On 05/12/2016 09:21 PM, Srikanth wrote:
>> > Hello,
>> >
>> > I was reading about Flink's checkpoint and wanted to check if I
>> > correctly understood the usage of barriers for exactly once processing.
>> >  1) Operator does alignment by buffering records coming after a barrier
>> > until it receives barrier from all upstream operators instances.
>> >  2) Barrier is always preceded by a watermark to trigger processing all
>> > windows that are complete.
>> >  3) Records in windows that are not triggered are also saved as part of
>> > checkpoint. These windows are repopulated when restoring from
>> checkpoints.
>> >
>> > In production setups, were there any cases where alignment during
>> > checkpointing caused unacceptable latency?
>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way we
>> > have exactly-once in most situations but prefer at least once over
>> > higher latency in corner cases.
>> >
>> > Srikanth
>>
>>
>


Re: Barriers at work

2016-05-13 Thread Srikanth
I have a follow up. Is there a recommendation of list of knobs that can be
tuned if at least once guarantee while handling failure is good enough?
For cases like alert generation, non idempotent sink, etc where the system
can live with duplicates or has other mechanism to handle them.

Srikanth

On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:

> Hi Srikanth!
>
> That is an interesting idea.
> I have it on my mind to create a design doc for checkpointing
> improvements. That could be added as a proposal there.
>
> I hope I'll be able to start with that design doc next week.
>
> Greetings,
> Stephan
>
>
> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax  wrote:
>
>> I don't think barries can "expire" as of now. Might be a nice idea
>> thought -- I don't know if this might be a problem in production.
>>
>> Furthermore, I want to point out, that an "expiring checkpoint" would
>> not break exactly-once processing, as the latest successful checkpoint
>> can always be used to recover correctly. Only the recovery-time would be
>> increase. because if a "barrier expires" and no checkpoint can be
>> stored, more data has to be replayed using the "old" checkpoint".
>>
>>
>> -Matthias
>>
>> On 05/12/2016 09:21 PM, Srikanth wrote:
>> > Hello,
>> >
>> > I was reading about Flink's checkpoint and wanted to check if I
>> > correctly understood the usage of barriers for exactly once processing.
>> >  1) Operator does alignment by buffering records coming after a barrier
>> > until it receives barrier from all upstream operators instances.
>> >  2) Barrier is always preceded by a watermark to trigger processing all
>> > windows that are complete.
>> >  3) Records in windows that are not triggered are also saved as part of
>> > checkpoint. These windows are repopulated when restoring from
>> checkpoints.
>> >
>> > In production setups, were there any cases where alignment during
>> > checkpointing caused unacceptable latency?
>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way we
>> > have exactly-once in most situations but prefer at least once over
>> > higher latency in corner cases.
>> >
>> > Srikanth
>>
>>
>


Re: Barriers at work

2016-05-13 Thread Srikanth
Thanks. I didn't know we could set that.

On Fri, May 13, 2016 at 12:44 PM, Stephan Ewen  wrote:

> You can use the checkpoint mode to "at least once".
> That way, barriers never block.
>
> On Fri, May 13, 2016 at 6:05 PM, Srikanth  wrote:
>
>> I have a follow up. Is there a recommendation of list of knobs that can
>> be tuned if at least once guarantee while handling failure is good enough?
>> For cases like alert generation, non idempotent sink, etc where the
>> system can live with duplicates or has other mechanism to handle them.
>>
>> Srikanth
>>
>> On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:
>>
>>> Hi Srikanth!
>>>
>>> That is an interesting idea.
>>> I have it on my mind to create a design doc for checkpointing
>>> improvements. That could be added as a proposal there.
>>>
>>> I hope I'll be able to start with that design doc next week.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax 
>>> wrote:
>>>
>>>> I don't think barries can "expire" as of now. Might be a nice idea
>>>> thought -- I don't know if this might be a problem in production.
>>>>
>>>> Furthermore, I want to point out, that an "expiring checkpoint" would
>>>> not break exactly-once processing, as the latest successful checkpoint
>>>> can always be used to recover correctly. Only the recovery-time would be
>>>> increase. because if a "barrier expires" and no checkpoint can be
>>>> stored, more data has to be replayed using the "old" checkpoint".
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 05/12/2016 09:21 PM, Srikanth wrote:
>>>> > Hello,
>>>> >
>>>> > I was reading about Flink's checkpoint and wanted to check if I
>>>> > correctly understood the usage of barriers for exactly once
>>>> processing.
>>>> >  1) Operator does alignment by buffering records coming after a
>>>> barrier
>>>> > until it receives barrier from all upstream operators instances.
>>>> >  2) Barrier is always preceded by a watermark to trigger processing
>>>> all
>>>> > windows that are complete.
>>>> >  3) Records in windows that are not triggered are also saved as part
>>>> of
>>>> > checkpoint. These windows are repopulated when restoring from
>>>> checkpoints.
>>>> >
>>>> > In production setups, were there any cases where alignment during
>>>> > checkpointing caused unacceptable latency?
>>>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way
>>>> we
>>>> > have exactly-once in most situations but prefer at least once over
>>>> > higher latency in corner cases.
>>>> >
>>>> > Srikanth
>>>>
>>>>
>>>
>>
>


Re: writeAsCSV with partitionBy

2016-05-24 Thread Srikanth
Isn't this related to -- https://issues.apache.org/jira/browse/FLINK-2672 ??

This can be achieved with a RollingSink[1] & custom Bucketer probably.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html

Srikanth

On Tue, May 24, 2016 at 1:07 AM, KirstiLaurila 
wrote:

> Yeah, created this one  https://issues.apache.org/jira/browse/FLINK-3961
> <https://issues.apache.org/jira/browse/FLINK-3961>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7118.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


How exactly does Idle-state retention policy work?

2019-09-17 Thread srikanth flink
Hi there,

I'm using FlinkSQL to solve to do the job for me. Based on this
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html>,
configured the idle-state milliseconds.

*context*: FlinkSQL reads Kafka stream with no key and put to dynamic
table( sourceKafka). There's another static table( badips) loaded from file
and the join is performed from dynamic table on static like: SELECT
sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.source.ip=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

As it said in the docs, my query doesn't have a 'groupby' for the
idle-state to act upon and evict the untouched. So how do I manage to evict
the older once?

Questions:

   1. If it happens to be that the 'ip' acts as a key in my query for the
   eviction to work, how does Flink justify the Heap size grew to 80GB and
   crash?
   2. Is is that every query with a time windowed join, Flink SQL will
   automatically clear older records that have become irrelevant?


Thanks
Srikanth


Approach to match join streams to create unique streams.

2019-09-23 Thread srikanth flink
 Hi there,

I've two streams source Kafka. Stream1 is a continuous data and stream2 is
a periodic update. Stream2 contains only one column.

*Use case*: Every entry from stream1 should verify if the stream2 has any
match.
The matched and unmatched records should be separated into new unique
streams. For example: column1, column10 from stream1 match/unmatch check on
stream2 column to put to a new stream safeStream and unSafeStream
respectively.

*Implemented solution*: stream2 as temporal function to join over stream1
which is a dynamic table.

   - Ran a time based query where stream1.column1 = stream2.column and
   stream1.column10 = stream2.column ; Working


   - Ran a time based query where stream1.column1 <> stream1.column and
   tream1.column10 <> stream1.column ; Not working.

Would like to ask if there's a possibility that I could load the stream as
a list so I could do a *contains*? OR any other approach?

Help appreciated.

Thanks
Srikanth


Can I cross talk between environments

2019-09-23 Thread srikanth flink
Hi,

I'm using Java code to source from Kafka, streaming to table and registered
the table. I understand that I have started the StreamExecutionEnvironment
and execution.

Is there a way that I could access the registered table/temporal function
from SQL client?

Thanks
Srikanth


Re: Approach to match join streams to create unique streams.

2019-09-24 Thread srikanth flink
Fabian,

Thanks, already implemented the left join.

Srikanth

On Tue, Sep 24, 2019 at 2:12 PM Fabian Hueske  wrote:

> Hi,
>
> AFAIK, Flink SQL Temporal table function joins are only supported as inner
> equality joins.
> An extension to left outer joins would be great, but is not on the
> immediate roadmap AFAIK.
>
> If you need the inverse, I'd recommend to implement the logic in a
> DataStream program with a KeyedCoProcessFunction.
>
> Best, Fabian
>
> Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
> flink.d...@gmail.com>:
>
>>  Hi there,
>>
>> I've two streams source Kafka. Stream1 is a continuous data and stream2
>> is a periodic update. Stream2 contains only one column.
>>
>> *Use case*: Every entry from stream1 should verify if the stream2 has
>> any match.
>> The matched and unmatched records should be separated into new unique
>> streams. For example: column1, column10 from stream1 match/unmatch check on
>> stream2 column to put to a new stream safeStream and unSafeStream
>> respectively.
>>
>> *Implemented solution*: stream2 as temporal function to join over
>> stream1 which is a dynamic table.
>>
>>- Ran a time based query where stream1.column1 = stream2.column and
>>stream1.column10 = stream2.column ; Working
>>
>>
>>- Ran a time based query where stream1.column1 <> stream1.column and
>>tream1.column10 <> stream1.column ; Not working.
>>
>> Would like to ask if there's a possibility that I could load the stream
>> as a list so I could do a *contains*? OR any other approach?
>>
>> Help appreciated.
>>
>> Thanks
>> Srikanth
>>
>>


How do I create a temporal function using Flink Clinet SQL?

2019-09-24 Thread srikanth flink
Hi,

I'm running time based joins, dynamic table over temporal function.
Is there a way I could create temporal table using flink SQL. And I'm using
v1.9.

Thanks
Srikanth


Explain time based windows

2019-09-24 Thread srikanth flink
Hi,

I'm trying to join a dynamic table and static(periodic batch update) table
using:
SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '65' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE.

Note, KafkaSource is a dynamic table, BadIP is a static table. Data rate is
2.2G every 1 minute.
1) Help me understand the query wrt the time intervals.
2) Would there be a possibility duplicate data or changes of missing data
due to time intervals?
3) Would Flink evict the older records less than the time interval defined?


Thanks
Srikanth


Flink SQL update-mode set to retract in env file.

2019-09-25 Thread srikanth flink
How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:
- key: zookeeper.connect
  value: localhost:2181
- key: bootstrap.servers
  value: localhost:9092
- key: group.id
  value: reconMultiAttempFail
format:
  type: json
  fail-on-missing-field: false
  json-schema: >
{
  type: 'object',
  properties: {
'a': {
   type: 'string'
},
'b': {
   type: 'string'
},
'cnt': {
   type: 'string'
}
  }
}
  derive-schema: false

schema:
  - name: 'a'
type: VARCHAR
 - name: 'b'
type: VARCHAR
  - name: 'cnt'
    type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth


Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread srikanth flink
Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a
type of Kafka which implments AppendStreamTableSink, the update-mode will
be append only".
If your statement defines retract mode could not be used for Kafka sinks as
it implements AppendStreamTableSink, but then the below code is working for
me, dumping data to Kafka:
DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer("reconMultiAttempFail",
new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with
SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang  wrote:

> Hi srikanth~
>
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink`
> `UpsertStreamTableSink` and `RetractStreamTableSink`.
> If the target table is a type of Kafka which implments
> AppendStreamTableSink, the update-mode will be append only.
> So if you want enable retract-mode you may need to insert into one kind of
> RetractStreamTableSink.
> Hope it helps you ~
>
>
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午2:50,srikanth flink  写道:
>
> How could I configure environment file for Flink SQL, update-mode: retract?
>
> I have this for append:
> properties:
> - key: zookeeper.connect
>   value: localhost:2181
> - key: bootstrap.servers
>   value: localhost:9092
> - key: group.id
>   value: reconMultiAttempFail
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'a': {
>type: 'string'
> },
> 'b': {
>type: 'string'
> },
> 'cnt': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
>
> schema:
>   - name: 'a'
> type: VARCHAR
>  - name: 'b'
> type: VARCHAR
>   - name: 'cnt'
> type: BIGINT
>
> Couldn't find any document for the same.
>
> someone help me with the syntax.
>
> Thanks
> Srikanth
>
>
>


Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread srikanth flink
Awesome, thanks!

On Thu, Sep 26, 2019 at 5:50 PM Terry Wang  wrote:

> Hi, Srikanth~
>
> In your code,
> DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {});  has converted the resultTable into a DataStream
> that’s unrelated with tableApi,
> And the following code `outStreamAgg.addSink(…)` is just a normall stream
> write to a FlinkKafka sink function.
> Your program code is a mixture of table api and dataStream programing not
> just single Table API.
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午5:47,srikanth flink  写道:
>
> Hi Terry Wang,
>
> Thanks for quick reply.
>
> I would like to understand more on your line " If the target table is a
> type of Kafka which implments AppendStreamTableSink, the update-mode will
> be append only".
> If your statement defines retract mode could not be used for Kafka sinks
> as it implements AppendStreamTableSink, but then the below code is working
> for me, dumping data to Kafka:
> DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> });
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> outStreamAgg.addSink(new
> FlinkKafkaProducer("reconMultiAttempFail", new SimpleStringSchema(),
> kafkaProducerProperties));
>
> Is it that the above functionality works only with Table API and not with
> SQL?
> Please explain.
>
> Thanks
> Srikanth
>
>
>
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang  wrote:
>
>> Hi srikanth~
>>
>> The Flink SQL update-mode is inferred from the target table type.
>> For now, there are three StreamTableSink type, `AppendStreamTableSink`
>> `UpsertStreamTableSink` and `RetractStreamTableSink`.
>> If the target table is a type of Kafka which implments
>> AppendStreamTableSink, the update-mode will be append only.
>> So if you want enable retract-mode you may need to insert into one kind
>> of RetractStreamTableSink.
>> Hope it helps you ~
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 在 2019年9月26日,下午2:50,srikanth flink  写道:
>>
>> How could I configure environment file for Flink SQL, update-mode:
>> retract?
>>
>> I have this for append:
>> properties:
>> - key: zookeeper.connect
>>   value: localhost:2181
>> - key: bootstrap.servers
>>   value: localhost:9092
>> - key: group.id
>>   value: reconMultiAttempFail
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'a': {
>>type: 'string'
>> },
>> 'b': {
>>type: 'string'
>> },
>> 'cnt': {
>>type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>>
>> schema:
>>   - name: 'a'
>> type: VARCHAR
>>  - name: 'b'
>> type: VARCHAR
>>   - name: 'cnt'
>> type: BIGINT
>>
>> Couldn't find any document for the same.
>>
>> someone help me with the syntax.
>>
>> Thanks
>> Srikanth
>>
>>
>>
>


Reading Key-Value from Kafka | Eviction policy.

2019-09-26 Thread srikanth flink
Hi,

My data source is Kafka, all these days have been reading the values from
Kafka stream to a table. The table just grows and runs into a heap issue.

Came across the eviction policy that works on only keys, right?

Have researched to configure the environment file(Flink SLQ) to read both
key and value, so as the eviction works on the keys and older data is
cleared. I found nothing in the docs, so far.

Could someone help with that?
If there's no support for reading key and value, can someone help me to
assign a key to the table I'm building from stream?

Thanks
Srikanth


Re: Reading Key-Value from Kafka | Eviction policy.

2019-09-26 Thread srikanth flink
Hi Miki,

What are those several ways? could you help me with references?

Use case:

We have a continuous credit card transaction stream flowing into a Kafka
topic, along with a set of defaulters of credit card in a .csv file(which
gets updated every day).


Thanks

Srikanth


On Fri, Sep 27, 2019 at 11:11 AM miki haiat  wrote:

> I'm sure there is several ways to implement it. Can you elaborate more on
> your use case ?
>
> On Fri, Sep 27, 2019, 08:37 srikanth flink  wrote:
>
>> Hi,
>>
>> My data source is Kafka, all these days have been reading the values from
>> Kafka stream to a table. The table just grows and runs into a heap issue.
>>
>> Came across the eviction policy that works on only keys, right?
>>
>> Have researched to configure the environment file(Flink SLQ) to read both
>> key and value, so as the eviction works on the keys and older data is
>> cleared. I found nothing in the docs, so far.
>>
>> Could someone help with that?
>> If there's no support for reading key and value, can someone help me to
>> assign a key to the table I'm building from stream?
>>
>> Thanks
>> Srikanth
>>
>


Querying nested JSON stream?

2019-10-17 Thread srikanth flink
Hi there,

I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON with
nested objects. Couldn't find much document on querying the nested JSON, so
I had to flatten the JSON and use as:
SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id`,
`dns.answers.data` FROM source;

Can someone help me with the query, querying nested JSON so I could save
resources running flattening job?


Thanks
Srikanth


Can a Flink query outputs nested json?

2019-10-24 Thread srikanth flink
I'm working on Flink SQL client. Input data is json format and contains
nested json.

I'm trying to query the nested json from the table and expecting the output
to be nested json instead of string.

I've build the environment file to define a table schema as:

> format:
>
  type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'lon': {
>   type: 'string'
> },
> 'rideTime': {
>   type: 'string'
> },
> 'nested': {
>   type: 'object',
>   properties: {
> 'inner': {
>   type: 'string'
> },
> 'nested1': {
>   type: 'object',
>   properties: {
> 'inner1': {
>   type: 'string'
> }
>   }
> }
>   }
> },
> 'name': {
>   type: 'string'
> }
>   }
> }
>   derive-schema: false
> schema:
>   - name: 'lon'
> type: VARCHAR
>   - name: 'rideTime'
> type: VARCHAR
>   - name: 'nested'
> type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
>   - name: 'name'
> type: VARCHAR


Sink table schema:

> format:
>   type: json
>   fail-on-missing-field: false
>   derive-schema: true
> schema:
>   - name: 'nested'
> type: ROW<`inner` STRING>
>

Queries Been trying the following queries
Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner`
from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested.inner: String]
TableSink schema:[nested: Row]

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:[nested: Row]

Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner`
from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1,
column 55.
Was expecting one of:

"EXCEPT" ...
"FETCH" ...
"FROM" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ORDER" ...
"MINUS" ...
"UNION" ...
"," ...

Help me understand the problem with my schema/query?
Also would like to add new columns and nested colums.

Thanks
Srikanth


Add custom fields into Json

2019-10-28 Thread srikanth flink
Hi there,

I'm querying json data and is working fine. I would like to add custom
fields including the query result.
My query looks like: select ROW(`source`),  ROW(`destination`), ROW(`dns`),
organization, cnt from (select (source.`ip`,source.`isInternalIP`) as
source, (destination.`ip`,destination.`isInternalIP`) as destination,
 (dns.`query`, dns.`answers`.`data`) as dns, organization as organization
from dnsTableS) tab1;

While I would like to add "'dns' as `agg.type`, 'dns' as `agg.name`" to the
same output, but the query is throw exceptions:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1,
column 278.
Was expecting one of:
"EXCEPT" ...
"FETCH" ...
"FROM" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ORDER" ...
"MINUS" ...
"UNION" ...
")" ...
"," ...

Could someone help me with this? Thanks

Srikanth


Using RocksDB as lookup source in Flink

2019-11-04 Thread srikanth flink
Hi there,

Can someone help me implement Flink source Kafka to Flink Sink RocksDB,
while I could use UDF for lookup RocksDB in SQL queries?

Context: I get a list of IPaddresses in a stream which I wish to store in
RocksDB. Therefore the other stream perform a lookup to match the IPaddress.


Thanks
Srikanth


What is the slot vs cpu ratio?

2019-11-06 Thread srikanth flink
Hi there,

I've 3 node cluster with 16cores each. How many slots could I utilize at
max and how to I do the calculation?


Thanks
Srikanth


Job Distribution Strategy On Cluster.

2019-11-06 Thread srikanth flink
Hi there,

I'm running Flink with 3 node cluster.
While running my jobs(both SQL client and jar submission), the jobs are
being assigned to single machine instead of distribution among the cluster.
How could I achieve the job distribution to make use of the computation
power?

Thanks
Srikanth


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread srikanth flink
Vina,

I've set parallelism as 6 while max parallelism is 128.

Thanks
Srikanth



On Mon, Nov 11, 2019 at 3:18 PM vino yang  wrote:

> Hi srikanth,
>
> What's your job's parallelism?
>
> In some scenes, many operators are chained with each other. if it's
> parallelism is 1, it would just use a single slot.
>
> Best,
> Vino
>
> srikanth flink  于2019年11月6日周三 下午10:03写道:
>
>> Hi there,
>>
>> I'm running Flink with 3 node cluster.
>> While running my jobs(both SQL client and jar submission), the jobs are
>> being assigned to single machine instead of distribution among the cluster.
>> How could I achieve the job distribution to make use of the computation
>> power?
>>
>> Thanks
>> Srikanth
>>
>


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread srikanth flink
Zhu Zhu,

That's awesome and is what I'm looking for.
Any update on when would be the next release date?

Thanks
Srikanth

On Mon, Nov 11, 2019 at 3:40 PM Zhu Zhu  wrote:

> Hi Srikanth,
>
> Is this issue what you encounter? FLINK-12122: a job would tend to fill
> one TM before using another.
> If it is, you may need to wait for the release 1.9.2 or 1.10, since it is
> just fixed.
>
> Thanks,
> Zhu Zhu
>
> vino yang  于2019年11月11日周一 下午5:48写道:
>
>> Hi srikanth,
>>
>> What's your job's parallelism?
>>
>> In some scenes, many operators are chained with each other. if it's
>> parallelism is 1, it would just use a single slot.
>>
>> Best,
>> Vino
>>
>> srikanth flink  于2019年11月6日周三 下午10:03写道:
>>
>>> Hi there,
>>>
>>> I'm running Flink with 3 node cluster.
>>> While running my jobs(both SQL client and jar submission), the jobs are
>>> being assigned to single machine instead of distribution among the cluster.
>>> How could I achieve the job distribution to make use of the computation
>>> power?
>>>
>>> Thanks
>>> Srikanth
>>>
>>


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread srikanth flink
Great, thanks for the update.

On Tue, Nov 12, 2019 at 8:51 AM Zhu Zhu  wrote:

> There is no plan for release 1.9.2 yet.
> Flink 1.10.0 is planned to be released in early January.
>
> Thanks,
> Zhu Zhu
>
> srikanth flink  于2019年11月11日周一 下午9:53写道:
>
>> Zhu Zhu,
>>
>> That's awesome and is what I'm looking for.
>> Any update on when would be the next release date?
>>
>> Thanks
>> Srikanth
>>
>> On Mon, Nov 11, 2019 at 3:40 PM Zhu Zhu  wrote:
>>
>>> Hi Srikanth,
>>>
>>> Is this issue what you encounter? FLINK-12122: a job would tend to fill
>>> one TM before using another.
>>> If it is, you may need to wait for the release 1.9.2 or 1.10, since it
>>> is just fixed.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> vino yang  于2019年11月11日周一 下午5:48写道:
>>>
>>>> Hi srikanth,
>>>>
>>>> What's your job's parallelism?
>>>>
>>>> In some scenes, many operators are chained with each other. if it's
>>>> parallelism is 1, it would just use a single slot.
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> srikanth flink  于2019年11月6日周三 下午10:03写道:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm running Flink with 3 node cluster.
>>>>> While running my jobs(both SQL client and jar submission), the jobs
>>>>> are being assigned to single machine instead of distribution among the
>>>>> cluster. How could I achieve the job distribution to make use of the
>>>>> computation power?
>>>>>
>>>>> Thanks
>>>>> Srikanth
>>>>>
>>>>


Table/SQL API to read and parse JSON, Java.

2019-12-01 Thread srikanth flink
Hi there,

I'm following the link

to read JSON data from Kafka and convert to table, programmatically. I'd
try and succeed declarative using SQL client.

My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
Code:

> String schema = "{type: 'object', properties: {'message': {type:
> 'string'},'@timestamp': {type: 'string'}}}";
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(6, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().getCheckpointTimeout();
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> tableEnv.connect(new
> Kafka().version("universal").topic("recon-data").startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
> .withFormat(new
> Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
> .withSchema(new Schema().field("message",
> Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
> .inAppendMode().registerTableSource("reconTableS");
>
> Table t = tableEnv.sqlQuery("select * from reconTableS");
> DataStream out = tableEnv.toAppendStream(t, Row.class);
> out.print();
>
> try {
> env.execute("Flink Example Json");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>

pom.xml:

> 
> UTF-8
> 1.9.0
> 1.8
> 2.11
> ${java.version}
> ${java.version}
> 
>

> 
> org.apache.flink
> flink-streaming-scala_2.11
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-planner_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
> flink-cep_2.11
> ${flink.version}
> 
> 
> mysql
> mysql-connector-java
> 5.1.39
> 
> 
>

The code threw the following error:

> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
> at
> kafka.flink.stream.list.match.ExampleJsonParser.main(ExampleJsonParser.java:31)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> ... 12 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=localhost:2181
> connector.properties.1.key=bootstrap.servers
> con

Row arity of from does not match serializers.

2019-12-05 Thread srikanth flink
ime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Help me understand the error in detail.

Thanks
Srikanth