Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
  Hi Roman,

  Very thanks for the feedbacks !
> Probably it would be simpler to just decline the RPC-triggered 
checkpoint 
> if not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

> But I wonder how significantly this waiting for EoP from every input 
will delay performing the first checkpoint 
> by B after becoming a new source. This may in turn impact 
exactly-once sinks and incremental checkpoints.
> Maybe a better option would be to postpone JM notification from 
source until it's EoP is consumed?

   I also agree with that there would indeed be possible cases that the 
checkpoint get slower since it could not skip
the data in  the result partition of the finished upstream task:
a) For aligned checkpoint, the cases would not happen since the 
downstream tasks would always need to 
process the buffers in order. 
   b)  With unaligned checkpoint enabled, the slower cases might happen 
if the downstream task processes very 
slowly. 

   But since only the result partition part of the finished upstream need 
wait to be processed, the other part of 
the execution graph could  still perform the unaligned checkpoint normally, I 
think the average delay caused would 
 be much lower than the completely aligned checkpoint, but there would still be 
extremely bad cases that
   the delay is long. 

   Declining the RPC-trigger checkpoint would indeed simplify the 
implementation, but since currently by default the
   failed checkpoint would cause job failover, thus we might have some 
concerns in directly decline the checkpoint.
   For postpone the notification the JM notification, since current JM 
should not be able to know if the task has 
   received all the EndOfPartition from the upstream tasks, we might need 
to introduce new RPC for notifying the 
   state and since the triggering is not atomic, we may also met with some  
synchronization issues between JM and TM, 
   which would introduce some complexity. 
  Thus another possible option might be let the upstream task to wait till 
all the pending buffers in the result partition has
  been flushed before get to finish. We could only do the wait for the 
PipelineResultPartition so it won't affect the batch
  jobs. With the waiting the unaligned checkpoint could continue to trigger 
the upstream task and skip the buffers in
  the result partition. Since the result partition state would be kept 
within the operator state of the last operator, after failover
  we would found that the last operator has an non-empty state and we would 
restart the tasks containing this operator to 
  resend the snapshotted buffers. Of course this would also introduce some 
complexity, and since the probability of long delay 
  would be lower than the completely aligned case, do you think it would be 
ok for us to view it as an optimization and 
  postpone it to future versions ? 

 Best,
 Yun


--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao 
Cc:Arvid Heise ; dev ; user 

Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one 
> checkpoint A has reported FINISHED, CheckpointCoordinator would 
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
> received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received 
> EndOfPartition from the network (namely inputChannel.onBuffer() is called 
> with 
> EndOfPartition) and then taking snapshot for the input channels, as the 
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if 
not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay 
performing the first checkpoint by B after becoming a new source. This may in 
turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until 
it's EoP is consumed?

Regards,
Roman



Re: How should I process a cumulative counter?

2021-01-11 Thread Larry Aspen
Hi Aljoscha,

thank you for your reply.

On 2021/01/08 15:44 Aljoscha Krettek wrote:
>the basic problem for your use case is that window boundaries are
>inclusive for the start timestamp and exclusive for the end timestamp.

That's true. What further complicates matters is that the last value of
the window (which should also be the first value of the next window)
might not have exactly the end timestamp of the one hour window but
could be even days in the future if the sensor is powered off, for
example, over a weekend.

>It's setup like this to ensure that consecutive tumbling windows don't
>overlap. This is only a function of how our `WindowAssigner` works, so
>it could be done differently in a different system.

I have tried to learn a little bit about the `WindowAssigner` system.
I think that if I could assign an element to two windows, I could
process the cumulative counter correctly. The issue, that I had with a
`WindowAssigner`, was that I didn't seem to have a way to discover what
other windows exist. I would've wanted to assign the element to a window
based on the element's timestamp and to the previous window. Here is an
illustration of what I mean:

W1 (16:00 - 17:00)  W2 (09:00 - 10:00)
+---+   +---+
|   a   |  ...sensor off... |   b   |
+---+   +---+

I would like to assign b to windows W2 and W1 so that I can calculate
runtime during W1 as b - a. The value `a` has been recorded when the
sensor started. Because there are no other values, the sensor was
shut down within 15 minutes. The value `b` has been recorded the
following day when the sensor was started the next time. By calculating
b - a I can find out for how many seconds the sensor was running during
window 1 (result would be between 0 and 15 minutes or 0 and 900
seconds).

>Have you tried using a sliding window where the `slide` is `size - 1ms`?
>With this, you would ensure that elements that fall exactly on the
>boundary, i.e. your hourly sensor updates would end up in both of the
>consecutive windows. It seems a bit unorthodox but could work in your
>case.

I've only tried a sliding window with a size of two hours and a `slide`
of one hour. My idea was to keep the window start and end aligned with
full hours. If I were to use a `slide` of `size - 1ms` wouldn't that
cause a widening misalignment with full hours?

My issue with a sliding window is that I can't know how far in the
future the value `b` is. Therefore I can't define a window size that is
long enough to include both values `a` and `b`. Here is an illustration
of that situation:

W1 (16:00 - 18:00)  W2 (09:00 - 10:00)
+---+   +
|   a   |   |  ...  |   b   | ...
+---+   +

Here the window size is two hours and the `slide` is one hour. I would
calculate the runtime for the first half of the window (16:00 - 17:00).
Here the problem is that the value `b` is so far in the future that
it isn't assigned to the window W1 and I can't calculate the runtime for
the hour 16:00 - 17:00.

Best regards,
Larry


Re: Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-11 Thread Aljoscha Krettek

Also cc'ing d...@flink.apache.org

On 2021/01/06 09:19, burkaygur wrote:

1) How do these changes impact the Table and SQL APIs? Are they completely
orthogonal or can we get the benefits of the new Batch Mode with Flink SQL
as well?


The answer here is a bit complicated. The Table API/SQL already use 
similar techniques under the hood for BATCH execution. Our recent 
changes just made similar functionality available for the DataStream 
API. So in a way the changes are orthogonal.


However, the changes are relevant when you want to interoperate with the 
DataStream and Table API. There it becomes relevant that both parts can 
do STREAMING/BATCH execution well. We're not 100% there yet on this 
front but we're tracking some work under FLIP-136 [1].


[1] https://cwiki.apache.org/confluence/x/0DN4CQ


2) What is the best ticket to follow the roadmap & track the progress of
this whole project. Specifically the parts about bootstrapping of state. I
would love to help contribute to it.


I would say the best way to follow progress is the dev mailing list and 
the the FLIP overview page [2]. That's not super intuitive and can be 
hard to follow for outsiders. Sometimes, people such as myself will 
write blog posts on the Flink website or private blogs that try and 
shine a light on the development so it might help to follow some 
relevant people from the project on Twitter, where such posts are often 
announced.


Specifically about state bootstrapping, we don't have many concrete 
thoughts yet. It would help if you could talk about some of the 
requirements that you would have for this.


Best,
Aljoscha

[2] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals


Best,
Aljoscha


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-11 Thread Aljoscha Krettek

On 2021/01/08 16:55, vinay.raic...@t-systems.com wrote:

Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, 
and `OffsetInitializers` that you were referring to in your previous reply, for 
my reference please to make it more clearer for me.


Ah sorry, but this I was referring to the Flink code. You can start with 
`KafkaSource` [1], which has an example block that shows how to use it 
in the Javadocs.


[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java


Re: Re: Use Flink to process request with list of queries and aggregate

2021-01-11 Thread Yun Gao
Hi Li,

From my view I think it would not be eaily use a countWindow if you have 
different number of records for each key (namely user in this case). I think 
you may need to user the low level KeyedProcessFunction [1] to keep some state 
by yourself. For example, each request might also carries the total number of 
requests of each user, and in the KeyedProcessFunction you might record the 
received number of requests and total requests of this user in the state. 
Whenever enough requests is received for each user, it could be known that the 
message is completely processed and the state of this user could also be 
cleaned at then.


Best,
 Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#the-keyedprocessfunction

--
Sender:Li Wang
Date:2021/01/11 07:10:27
Recipient:
Theme:Re: Use Flink to process request with list of queries and aggregate

Can I get any suggestion? Thanks a lot.

- Li



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if
the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's
source subtask finishes.

> But since only the result partition part of the finished upstream need
wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the
EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has been flushed before get
to finish.
This is what I meant by "postpone JM notification from source". Just
blocking the task thread wouldn't add much complexity, though I'm not sure
if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and
postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao  wrote:

>   Hi Roman,
>
>   Very thanks for the feedbacks !
>
>
> > Probably it would be simpler to just decline the RPC-triggered
> checkpoint
> > if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
> > But I wonder how significantly this waiting for EoP from every
> input will delay performing the first checkpoint
> > by B after becoming a new source. This may in turn impact
> exactly-once sinks and incremental checkpoints.
> > Maybe a better option would be to postpone JM notification from
> source until it's EoP is consumed?
>
>I also agree with that there would indeed be possible cases that
> the checkpoint get slower since it could not skip
>the data in  the result partition of the finished upstream task:
> a) For aligned checkpoint, the cases would not happen since
> the downstream tasks would always need to
> process the buffers in order.
>b)  With unaligned checkpoint enabled, the slower cases might
> happen if the downstream task processes very
> slowly.
>
>But since only the result partition part of the finished upstream
> need wait to be processed, the other part of
>the execution graph could  still perform the unaligned checkpoint
> normally, I think the average delay caused would
>be much lower than the completely aligned checkpoint, but there
> would still be extremely bad cases that
>the delay is long.
>
>Declining the RPC-trigger checkpoint would indeed simplify the
> implementation, but since currently by default the
>failed checkpoint would cause job failover, thus we might have some
> concerns in directly decline the checkpoint.
>For postpone the notification the JM notification, since current JM
> should not be able to know if the task has
>received all the EndOfPartition from the upstream tasks, we might
> need to introduce new RPC for notifying the
>state and since the triggering is not atomic, we may also met with
> some  synchronization issues between JM and TM,
>which would introduce some complexity.
>
>   Thus another possible option might be let the upstream task to wait
> till all the pending buffers in the result partition has
>   been flushed before get to finish. We could only do the wait for the
> PipelineResultPartition so it won't affect the batch
>   jobs. With the waiting the unaligned checkpoint could continue to
> trigger the upstream task and skip the buffers in
>   the result partition. Since the result partition state would be kept
> within the operator state of the last operator, after failover
>   we would found that the last operator has an non-empty state and we
> would restart the tasks containing this operator to
>   resend the snapshotted buffers. Of course this would also introduce
> some complexity, and since the probability of long delay
>   would be lower than the completely aligned case, do you think it
> would be ok for us to view it as an optimization and
>   postpone it to future versions ?
>
>  Best,
>  Yun
>
>
>
> --
> From:Khachatryan Roman 
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun Gao 
> Cc:Arvid Heise ; dev ; user <
> user@flink.apache.org>
> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> Thanks a lot for your answers Yun,
>
> > In detail, support

mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
Hey there,
recently I have to join two streams while one of it may be idle for a long
time, in flink 1.12, the Watermark Generator has a method `withIdleness` to
detect if a stream is idle or not so that the operator can still advance
its watermark by another active stream, and the state of this operator will
continuously grow up.

But in flink 1.10, there's no such withIdleness method
flink 1.10 docs mention a workaround in
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
,but this doesn't work well.

After walking through the code,I found StreamSourceContexts#getSourceContext
provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext

public static  SourceFunction.SourceContext getSourceContext(
  TimeCharacteristic timeCharacteristic,
  ProcessingTimeService processingTimeService,
  Object checkpointLock,
  StreamStatusMaintainer streamStatusMaintainer,
  Output> output,
  long watermarkInterval,
  long idleTimeout) {



StreamSource#run

this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);


After extending a flink KafkaConnector and setting idleness using
reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to
determine if a stream is idle and what will be the side effect.

thx.


Flink 1.12 Kryo Serialization Error

2021-01-11 Thread Yuval Itzchakov
Hi,

I've implemented a KryoSerializer for a specific JSON type in my
application as I have a bunch of UDFs that depend on a RAW('io.circe.Json')
encoder being available. The implementation is rather simple. When I run my
Flink application with Kryo in trace logs, I see that data gets properly
serialized / deserialized using the serializer. However, after about 30
seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741,
length: 733793654, index: 69, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.fli

Re: Using key.fields in 1.12

2021-01-11 Thread Timo Walther
There are plans to also derive the table schema from Avro schema. But we 
haven't decided on a syntax for this yet. For now, we only support this 
through catalogs such as Confluent schema registry.


Regards,
Timo


On 07.01.21 21:42, Aeden Jameson wrote:
  Brilliant, thank you. That will come in handy. I was looking through 
docs hoping there was a way to still specify the schema with no luck. 
Does such an option exist?


On Thu, Jan 7, 2021 at 2:33 AM Timo Walther > wrote:


Hi Aeden,

`format.avro-schema` is not required anymore in the new design. The
Avro
schema is derived entirely from the table's schema.

Regards,
Timo



On 07.01.21 09:41, Aeden Jameson wrote:
 > Hi Timo,
 >
 > Thanks for responding. You're right. So I did update the properties.
 >>From what I can tell the new design you're referring to uses the
 > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
 > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
 > support those options. Is that right? So I updated my
configuration to
 >
 > connector    = 'kafka'
 > topic   = 'my-topic'
 > properties.group.id  =
'my-consumer-group'
 > properties.bootstrap.servers = '...'
 > format = 'avro'
 > format.avro-schema = ''
 > key.fields = 'my_key_field'
 >
 > However, the property format.avro-schema doesn't appear to be
 > supported by KafkaDynamicTableFactory. I get this exception.
 >
 > Caused by: org.apache.flink.table.api.ValidationException:
Unsupported
 > options found for connector 'kafka'.
 >
 > Unsupported options:
 >
 > format.avro-schema
 >
 > Supported options:
 >
 > connector
 > format
 > key.fields
 > key.fields-prefix
 > key.format
 > properties.bootstrap.servers
 > properties.group.id 
 > property-version
 > scan.startup.mode
 > scan.startup.specific-offsets
 > scan.startup.timestamp-millis
 > scan.topic-partition-discovery.interval
 > sink.parallelism
 > sink.partitioner
 > sink.semantic
 > topic
 > topic-pattern
 > value.fields-include
 > value.format
 >          at

org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
 >          at

org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
 >          at

org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
 >          at

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
 >          at

org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
 >          ... 21 more
 >
 > FAILURE: Build failed with an exception.
 >
 >
 >
 >
 > The format.avro-schema property was supported it what looks to me the
 > old design in in KafkaTableSourceSinkFactoryBase with this line,
 >
 >      properties.add(FORMAT + ".*");
 >
 >

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160


 >
 > Does format.avro-schema need to be specified differently?
 >
 > Thank you,
 > Aeden
 >
 > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther mailto:twal...@apache.org>> wrote:
 >>
 >> Hi Aeden,
 >>
 >> we updated the connector property design in 1.11 [1]. The old
 >> translation layer exists for backwards compatibility and is
indicated by
 >> `connector.type=kafka`.
 >>
 >> However, `connector = kafka` indicates the new property design and
 >> `key.fields` is only available there. Please check all
properties again
 >> when upgrading, they are mentioned here [2].
 >>
 >> Regards,
 >> Timo
 >>
 >>
 >> [1]
 >>

https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory


 >> [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/

 >>
 >>
 >> On 06.01.21 18:35, Aeden Jameson wrote:
 >>> Yes, I do have that dependency. I see it in the dependency view of
 >>> intellij and directly. in the uber jar. Thanks for responding.
 >>>
 >>> - Aeden
 >>>
 >>> On Wed, Jan 6, 2

Timestamp Issue with OutputTags

2021-01-11 Thread Priyanka Kalra A
Hi Team,

We are generating multiple side-output tags and using default processing time 
on non-keyed stream. The class $YYY extends ProcessFunction and 
implementation is provided for processElement method. Upon sending valid data, 
it gives error "Invalid timestamp: -9223372036854775808. Timestamp should 
always be non-negative or null".


  *   Why is it not able to read timestamp?
  *   Why is not taking system default time as processing time?

Complete stack trace for reference:
java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. 
Timestamp should always be non-negative or null.
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:70) 
~[kafka-clients-0.11.0.2.jar:?]
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:93) 
~[kafka-clients-0.11.0.2.jar:?]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
com.eee.dd.ccc.aaa.processing.$YYY.processElement(.java:166)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
 ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757)
 ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]


Your help with this would be deeply appreciated!


Thanks & Regards,
Priyanka Kalra


Re: Timestamp Issue with OutputTags

2021-01-11 Thread Taher Koitawala
Can you please share your code?

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <
priyanka.a.ka...@ericsson.com> wrote:

> Hi Team,
>
>
>
> We are generating multiple side-output tags and using default processing
> time on non-keyed stream. The class $YYY extends *ProcessFunction* O> and implementation is provided for *processElement* method. Upon
> sending valid data, it gives error "*Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null*".
>
>
>
>- Why is it not able to read timestamp?
>- Why is not taking system default time as processing time?
>
>
>
> *Complete stack trace for reference:*
>
> java.lang.IllegalArgumentException: Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null.
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:70)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:93)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> *at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> *at
> com.eee.dd.ccc.aaa.processing.$YYY.processElement(.java:166)*
>
> *at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151)
> ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757)
> ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.f

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Chesnay Schepler
The idleTimeout you found is from an earlier attempt at implementing 
idleness, but making it configurable was aborted midway through as there 
were some API issues. The effort was subsumed by a new source interface 
and watermark generators that were introduced in 1.12.


Some more details can be found in FLINK-5018 
.


On 1/11/2021 12:40 PM, Akisaya wrote:

Hey there,
recently I have to join two streams while one of it may be idle for a 
long time, in flink 1.12, the Watermark Generator has a method 
`withIdleness` to detect if a stream is idle or not so that the 
operator can still advance its watermark by another active stream, and 
the state of this operator will continuously grow up.


But in flink 1.10, there's no such withIdleness method
flink 1.10 docs mention a workaround in 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission,but 
this doesn't work well.


After walking through the code,I found 
StreamSourceContexts#getSourceContext provides a param idleness which 
is hard coded to -1 in StreamSource#run.


StreamSourceContexts#getSourceContext
public static  SourceFunction.SourceContext getSourceContext(
   TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, 
Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, 
Output> output, long watermarkInterval, long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic, getProcessingTimeService(), lockingObject, 
streamStatusMaintainer, collector, watermarkInterval, -1);

After extending a flink KafkaConnector and setting idleness using 
reflection, I found it works as I expected!




I'm very curious that why flink does not provide this param to user to 
determine if a stream is idle and what will be the side effect.


thx.








Testing Flink Jobs

2021-01-11 Thread KristoffSC
Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline. 
I would like to base my test on [1].

My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].

In all examples available online I can see that in the actual test method
env.execute() is called, which starts deployment of a job.

However in my case, the deployment of job takes some significant amount of
time. This is caused by fact that we need to load some "special" libraries
that should not be mocked for tests. That is why, we would like to call it
only once, hence deploy the job on a MiniClsuter only once.

My StreamJob.main method contains all pipeline setup plus call to
env.execute().


However when I do that, for example when I initiate my job in another
ClassRule method or BeforeClass method, I noticed that tests hangs. The
thread from Junit is actually waiting on env.execute(). which in my case
never ends. However the underlying minicluster is working fine. 


Questions:
1. what would be a preferred way to setup my tests, when I would like to
deploy my StreamJobOnly once
2. how can i check if a cluster, used in my tests is ready, and Job
deployment is finished.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
thank you @chesnay

I tried in vain to find the issue about introduction of  new watermark
strategy, can you provide some details about it ?

Chesnay Schepler  于2021年1月11日周一 下午9:43写道:

> The idleTimeout you found is from an earlier attempt at implementing
> idleness, but making it configurable was aborted midway through as there
> were some API issues. The effort was subsumed by a new source interface and
> watermark generators that were introduced in 1.12.
>
> Some more details can be found in FLINK-5018
> .
>
> On 1/11/2021 12:40 PM, Akisaya wrote:
>
> Hey there,
> recently I have to join two streams while one of it may be idle for a long
> time, in flink 1.12, the Watermark Generator has a method `withIdleness`
> to detect if a stream is idle or not so that the operator can still advance
> its watermark by another active stream, and the state of this operator will
> continuously grow up.
>
> But in flink 1.10, there's no such withIdleness method
> flink 1.10 docs mention a workaround in
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> ,but this doesn't work well.
>
> After walking through the code,I found StreamSourceContexts#getSourceContext
> provides a param idleness which is hard coded to -1 in StreamSource#run.
>
> StreamSourceContexts#getSourceContext
>
> public static  SourceFunction.SourceContext getSourceContext(
>   TimeCharacteristic timeCharacteristic,  ProcessingTimeService 
> processingTimeService,  Object checkpointLock,  
> StreamStatusMaintainer streamStatusMaintainer,  Output> 
> output,  long watermarkInterval,  long idleTimeout) {
>
>
>
> StreamSource#run
>
> this.ctx = StreamSourceContexts.getSourceContext(
>timeCharacteristic,   getProcessingTimeService(),   lockingObject,   
> streamStatusMaintainer,   collector,   watermarkInterval,   -1);
>
>
> After extending a flink KafkaConnector and setting idleness using
> reflection, I found it works as I expected!
>
>
>
> I'm very curious that why flink does not provide this param to user to
> determine if a stream is idle and what will be the side effect.
>
> thx.
>
>
>
>
>
>


Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Chesnay Schepler
This is the parent ticket for the new source interface:FLINK-10740 

This is the parent ticket for the reworked watermark 
generators:FLINK-17653 


On 1/11/2021 5:16 PM, Akisaya wrote:

thank you @chesnay

I tried in vain to find the issue about introduction of new watermark 
strategy, can you provide some details about it ?


Chesnay Schepler mailto:ches...@apache.org>> 
于2021年1月11日周一 下午9:43写道:


The idleTimeout you found is from an earlier attempt at
implementing idleness, but making it configurable was aborted
midway through as there were some API issues. The effort was
subsumed by a new source interface and watermark generators that
were introduced in 1.12.

Some more details can be found in FLINK-5018
.

On 1/11/2021 12:40 PM, Akisaya wrote:

Hey there,
recently I have to join two streams while one of it may be idle
for a long time, in flink 1.12, the Watermark Generator has a
method `withIdleness` to detect if a stream is idle or not so
that the operator can still advance its watermark by another
active stream, and the state of this operator will
continuously grow up.

But in flink 1.10, there's no such withIdleness method
flink 1.10 docs mention a workaround in

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission,but
this doesn't work well.

After walking through the code,I found
StreamSourceContexts#getSourceContext provides a param idleness
which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext
public static  SourceFunction.SourceContext getSourceContext(
   TimeCharacteristic timeCharacteristic, ProcessingTimeService 
processingTimeService, Object checkpointLock, StreamStatusMaintainer 
streamStatusMaintainer, Output> output, long 
watermarkInterval, long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic, getProcessingTimeService(), lockingObject, 
streamStatusMaintainer, collector, watermarkInterval, -1);

After extending a flink KafkaConnector and setting idleness using
reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to
user to determine if a stream is idle and what will be the side
effect.

thx.










Re: Testing Flink Jobs

2021-01-11 Thread Chesnay Schepler

1)
You can either execute the job in a separate thread, or set 
DeploymentOptions.ATTACHED to false in the MiniCluster configuration.

2)
The cluster not being ready is /usually/ not really an issue. I wouldn't 
worry about it for the time being.
(The reason being that the MiniCluster resource already starts all 
components and at most requires components to connect to each other.)
As for when the job is finished, the MiniClusterWithClientResources 
provides you with a MiniClusterClient, on which you can call 
requestJobResult(JobID). This returns a future that is completed when 
the job has terminated. Alternatively you can poll the job status via 
#getJobStatus.

You can retrieve the JobID via #listJobs().

On 1/11/2021 4:21 PM, KristoffSC wrote:

Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline.
I would like to base my test on [1].

My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].

In all examples available online I can see that in the actual test method
env.execute() is called, which starts deployment of a job.

However in my case, the deployment of job takes some significant amount of
time. This is caused by fact that we need to load some "special" libraries
that should not be mocked for tests. That is why, we would like to call it
only once, hence deploy the job on a MiniClsuter only once.

My StreamJob.main method contains all pipeline setup plus call to
env.execute().


However when I do that, for example when I initiate my job in another
ClassRule method or BeforeClass method, I noticed that tests hangs. The
thread from Junit is actually waiting on env.execute(). which in my case
never ends. However the underlying minicluster is working fine.


Questions:
1. what would be a preferred way to setup my tests, when I would like to
deploy my StreamJobOnly once
2. how can i check if a cluster, used in my tests is ready, and Job
deployment is finished.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Statefun with RabbitMQ consumes message but does not run statefun

2021-01-11 Thread Stephan Pelikan
Hi,

I try to use RabbitMQ as a Source. My source consumes messages of the queue but 
the statefun is not execution - not even created.

This is my main function:

1 public static void main(String[] args) throws Exception {
2
3 final var env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5 env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);
6
7 env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE);
8 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
9 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
10 
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
11
12 final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
13 statefunConfig.setFlinkJobName("test");
14 statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
15
16 final var connectionConfig = new RMQConnectionConfig.Builder()
17 .setHost("localhost")
18 .setUserName("guest")
19 .setPassword("guest")
20 .setPort(5672)
21 .setVirtualHost("test")
22 .setPrefetchCount(5000)
23 .build();
24
25 final var deserializationSchema = new 
TypeInformationSerializationSchema<>(
26 new ProtobufTypeInformation<>(Any.class), env.getConfig());
27 final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, 
true, deserializationSchema);
28
29 final var source = env
30 .addSource(rmqSource, TEST_INGRESS)
31 .setParallelism(1)
32 .map(msg -> {
33 return RoutableMessageBuilder
34 .builder()
35 .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
36 .withMessageBody(msg)
37 .build();
38 });
39
40 StatefulFunctionDataStreamBuilder
41 .builder("test")
42 .withDataStreamAsIngress(source)
43 .withFunctionProvider(MyStatefun.TYPE, unused -> {
44 return new MyStatefun();
45 })
46 .withEgressId(MyStatefun.EGRESS)
47 .withConfiguration(statefunConfig)
48 .build(env)
49 .getDataStreamForEgressId(MyStatefun.EGRESS)
50 .addSink(new PrintSinkFunction<>(true));
51
52 env.execute();
53
54 }

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 
is never called. The message is reportingly consumed but never acknowledged or 
processed. Before using RabbitMQ I used a custom SourceFunction to fake input 
data and it worked well.

To setup things I use a local environment but logging does not show up any 
errors. Before my current problem I had another error during message 
deserialization and it wasn't reported either. Unfortunately I didn't manage to 
get the exception in the log/stdout. I had to use the debugger to find the 
reason of the former problem. In this situation now the debugger shows no 
thrown or caught exceptions. That's way I stuck.

Of course I would like to know what's the problem with my code. But I guess it 
is not obviously. Maybe some can give me a hint how to turn on exception 
logging which might help to get closer to the origin of the phenomenon.

Thanks in advance,
Stephan



Statement Sets

2021-01-11 Thread Aeden Jameson
When using statement sets, if two select queries use the same table
(e.g. Kafka Topic), does each query get its own copy of data?

Thank you,
Aeden


Stateful Functions: Dynamically define and load remote modules

2021-01-11 Thread Ahmad Alkilani
Hi,
I see that you need to tell the Flink Stateful runtime about remote
stateful function modules via a yaml file provided at deploy time. Given
remote modules and stateful functions are an external deployment concern
anyway, Is it possible to dynamically associate Remote Modules with Remote
Function Endpoints to an existing/already running Flink stateful
application?

The use case is allowing dynamic composability of functions. The flink
stateful application would receive a request to dynamically string together
a new route based on some input data. The new route would be making calls
to lets say a newly created Flink stateful application deployed remotely in
an arbitrary language.

Is this possible? On the roadmap? A bad idea? Why or why not?

Thank you!

>


Log length

2021-01-11 Thread Rex Fenley
Hello,

We've collected over 150 MiB of log lines in 5 days. Is there a way to tell
Flink to eject log lines after a certain length so we don't eventually run
out of disk?

ThankS1

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Log length

2021-01-11 Thread Chesnay Schepler
Have a look at RollingFileAppenders 
. 
These have become the default in 1.12 .


On 1/12/2021 12:53 AM, Rex Fenley wrote:

Hello,

We've collected over 150 MiB of log lines in 5 days. Is there a way to 
tell Flink to eject log lines after a certain length so we don't 
eventually run out of disk?


ThankS1

--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: state reset(lost) on TM recovery

2021-01-11 Thread Chesnay Schepler
Just do double-check, are you aware that ValueState within a 
Keyed*Function is scoped to the key of the input element(s)? I.e., any 
stored value is only accessible if an element with the same key is 
processed?


On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:

Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job 
which reads from Kafka, transforms data and output into Kafka, one of 
processing nodes is KeyedCoProcessFunction with ValueState:


 1. generated some input data, I see in log that state.update() is
called and subsequent state.value() return not null
 2. wait for checkpoint
 3. restart taskmanager
 4. state.value() returns null

I've tried to change backend from rocksdb to filesystem - same result, 
after taskmanager restart state.value() returns null


Any ideas, what could cause resetting state to null?

Thanks,
Alexey





Re: Log length

2021-01-11 Thread Rex Fenley
Thanks, I'll check them out. What's the default in 1.11.2?

On Mon, Jan 11, 2021 at 4:26 PM Chesnay Schepler  wrote:

> Have a look at RollingFileAppenders
> .
> These have become the default in 1.12 .
>
> On 1/12/2021 12:53 AM, Rex Fenley wrote:
>
> Hello,
>
> We've collected over 150 MiB of log lines in 5 days. Is there a way to
> tell Flink to eject log lines after a certain length so we don't eventually
> run out of disk?
>
> ThankS1
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it could not limit the 
possible max delay.

> Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY 
indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint 
interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each 
checkpoint.

>> Thus another possible option might be let the upstream task to wait till all 
>> the pending buffers in the result partition has been flushed before get to 
>> finish.
> This is what I meant by "postpone JM notification from source". Just blocking 
> the task thread wouldn't add much complexity, though I'm not sure if it would 
> cause any problems.

>> do you think it would be ok for us to view it as an optimization and 
>> postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and 
currently 
I also do not see explicit problems for waiting for the flush of pipeline 
result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the 
> downstream task processes very slowly. 
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's 
source subtask finishes.

> But since only the result partition part of the finished upstream need wait 
> to be processed, the other part of 
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs 
are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the 
> implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns 
> in directly decline the checkpoint.
Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all 
> the pending buffers in the result partition has been flushed before get to 
> finish.
This is what I meant by "postpone JM notification from source". Just blocking 
the task thread wouldn't add much complexity, though I'm not sure if it would 
cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone 
> it to future versions ? 
I think that's a good idea.

Regards,
Roman



Re: state reset(lost) on TM recovery

2021-01-11 Thread Alexey Trenikhun
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() 
to ensure that key is same, but you are right in terms that it is scope 
related, the key is protobuf object and I specify custom TypeInformation in 
keyBy(), today I've changed code to use Tuple2 derived class instead of 
protobuf and it started to work, but why it is not working with protobuf and 
custom type information is unclear, checked serialize/deserialize - returns 
equal object, further until TM restarts it works. Is any special requirements 
for TypeSerializer and TypeInformation for key types ?


@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler 
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: state reset(lost) on TM recovery

Just do double-check, are you aware that ValueState within a Keyed*Function is 
scoped to the key of the input element(s)? I.e., any stored value is only 
accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which 
reads from Kafka, transforms data and output into Kafka, one of processing 
nodes is KeyedCoProcessFunction with ValueState:

  1.  generated some input data, I see in log that state.update() is called and 
subsequent state.value() return not null
  2.  wait for checkpoint
  3.  restart taskmanager
  4.  state.value() returns null

I've tried to change backend from rocksdb to filesystem - same result, after 
taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey




Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Hello,

When using the TableAPI, is it safe to run a flink job with a different
`-p` parallelism while restoring from a checkpoint (not a savepoint) using
`-s`, without any rescaling of actual machines? I don't seem to find this
documented anywhere.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Yun Tang
 Hi Rex,

I think doc [1] should have given some descriptions. Rescaling from previous 
checkpoint is still supported in current Flink version.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

Best
Yun Tang

From: Rex Fenley 
Sent: Tuesday, January 12, 2021 11:01
To: user 
Cc: Brad Davis 
Subject: Restoring from checkpoint with different parallism

Hello,

When using the TableAPI, is it safe to run a flink job with a different `-p` 
parallelism while restoring from a checkpoint (not a savepoint) using `-s`, 
without any rescaling of actual machines? I don't seem to find this documented 
anywhere.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Thanks! Looks like you can't with unaligned checkpoints, which seems to
imply that you can with normal checkpointing mechanism.

On Mon, Jan 11, 2021 at 7:56 PM Yun Tang  wrote:

>  Hi Rex,
>
> I think doc [1] should have given some descriptions. Rescaling from
> previous checkpoint is still supported in current Flink version.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>
> Best
> Yun Tang
> --
> *From:* Rex Fenley 
> *Sent:* Tuesday, January 12, 2021 11:01
> *To:* user 
> *Cc:* Brad Davis 
> *Subject:* Restoring from checkpoint with different parallism
>
> Hello,
>
> When using the TableAPI, is it safe to run a flink job with a different
> `-p` parallelism while restoring from a checkpoint (not a savepoint) using
> `-s`, without any rescaling of actual machines? I don't seem to find this
> documented anywhere.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Hello,

We're using the TableAPI and want to optimize for checkpoint alignment
times. We received some advice to possibly use at-least-once. I'd like to
understand how checkpointing works in at-least-once mode so I understand
the caveats and can evaluate whether or not that will work for us.

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: How does at least once checkpointing work

2021-01-11 Thread Yuan Mei
Hey Rex,

You probably will find the link below helpful; it explains how
at-least-once (does not have alignment) is different
from exactly-once(needs alignment). It also explains how the
alignment phase is skipped in the at-least-once mode.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#exactly-once-vs-at-least-once

In a high level, at least once mode for a task with multiple input channels
1. does NOT block processing to wait for barriers from all inputs, meaning
the task keeps processing data after receiving a barrier even if it has
multiple inputs.
2. but still, a task takes a snapshot after seeing the checkpoint barrier
from all input channels.

In this way, a Snapshot N may contain data change coming from Epoch N+1;
that's where "at least once" comes from.

On Tue, Jan 12, 2021 at 1:03 PM Rex Fenley  wrote:

> Hello,
>
> We're using the TableAPI and want to optimize for checkpoint alignment
> times. We received some advice to possibly use at-least-once. I'd like to
> understand how checkpointing works in at-least-once mode so I understand
> the caveats and can evaluate whether or not that will work for us.
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Thanks for the info.

It sounds like any state which does not have some form of uniqueness could
end up being incorrect.

Specifically in my case, all rows passing through the execution graph have
unique ids. However, any operator from groupby foreign_key then sum/count
could end up with an inconsistent count. Normally a retract (-1) and then
insert (+1) would keep the count correct, but with "at least once" a
retract (-1) may be from epoch n+1 and therefore played twice, making the
count equal less than it should actually be.

Am I understanding this correctly?

Thanks!

On Mon, Jan 11, 2021 at 10:06 PM Yuan Mei  wrote:

> Hey Rex,
>
> You probably will find the link below helpful; it explains how
> at-least-once (does not have alignment) is different
> from exactly-once(needs alignment). It also explains how the
> alignment phase is skipped in the at-least-once mode.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#exactly-once-vs-at-least-once
>
> In a high level, at least once mode for a task with multiple input channels
> 1. does NOT block processing to wait for barriers from all inputs, meaning
> the task keeps processing data after receiving a barrier even if it has
> multiple inputs.
> 2. but still, a task takes a snapshot after seeing the checkpoint barrier
> from all input channels.
>
> In this way, a Snapshot N may contain data change coming from Epoch N+1;
> that's where "at least once" comes from.
>
> On Tue, Jan 12, 2021 at 1:03 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> We're using the TableAPI and want to optimize for checkpoint alignment
>> times. We received some advice to possibly use at-least-once. I'd like to
>> understand how checkpointing works in at-least-once mode so I understand
>> the caveats and can evaluate whether or not that will work for us.
>>
>> Thanks!
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



RE: Timestamp Issue with OutputTags

2021-01-11 Thread Priyanka Kalra A
Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction
{
private static final long serialVersionUID = 1L;
private HashMap> outputMap = new 
HashMap<>();
private List tagList;

public OutputTagProcessingFunction(List tagList) {
super();
this.tagList = tagList;
}

@Override
public void processElement(final GenericRecord value, Context ctx, 
Collector out) throws Exception {
Set tagSet = new HashSet<>();
for (String tag : tagList) {
List tags = Arrays.asList(tag.split(","));
tagSet.addAll(tags);
}

for (String tag : tagSet) {
outputMap.putIfAbsent(tag, new OutputTag(tag) {});
ctx.output(outputMap.get(tag), value);
}
}
}

Exception comes at highlighted line.


Regards,
Priyanka
From: Taher Koitawala 
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A 
Cc: user ; Sushil Kumar Singh B 
; Anuj Kumar Jain A 
; Chirag Dewan ; 
Pankaj Kumar Aggarwal 
Subject: Re: Timestamp Issue with OutputTags

Can you please share your code?

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A 
mailto:priyanka.a.ka...@ericsson.com>> wrote:
Hi Team,

We are generating multiple side-output tags and using default processing time 
on non-keyed stream. The class $YYY extends ProcessFunction and 
implementation is provided for processElement method. Upon sending valid data, 
it gives error "Invalid timestamp: -9223372036854775808. Timestamp should 
always be non-negative or null".


  *   Why is it not able to read timestamp?
  *   Why is not taking system default time as processing time?

Complete stack trace for reference:
java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. 
Timestamp should always be non-negative or null.
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:70) 
~[kafka-clients-0.11.0.2.jar:?]
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:93) 
~[kafka-clients-0.11.0.2.jar:?]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
 ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
com.eee.dd.ccc.aaa.processing.$YYY.processElement(.java:166)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
 ~[flink-connector-kafka-base_2.11-1.11.2.j

Re: Timestamp Issue with OutputTags

2021-01-11 Thread Taher Koitawala
Hi Priyanka,
  I see that your are generating dynamic output tags. AFAIK, dynamic
tagging is causing that issue. I don't think we can add tags after
operators are running.

Can you try with a static named tag which is defined final. And output data
that way.

Added Till

On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <
priyanka.a.ka...@ericsson.com> wrote:

> Below is the code:
>
> public class OutputTagProcessingFunction extends
> ProcessFunction
>
> {
>
> private static final long serialVersionUID = 1L;
>
> private HashMap> outputMap = new
> HashMap<>();
>
> private List tagList;
>
>
>
> public OutputTagProcessingFunction(List tagList) {
>
> super();
>
> this.tagList = tagList;
>
> }
>
>
>
> @Override
>
> public void processElement(final GenericRecord value, Context ctx,
> Collector out) throws Exception {
>
> Set tagSet = new HashSet<>();
>
> for (String tag : tagList) {
>
> List tags = Arrays.asList(tag.split(","));
>
> tagSet.addAll(tags);
>
> }
>
>
>
> for (String tag : tagSet) {
>
> outputMap.putIfAbsent(tag, new OutputTag(tag)
> {});
>
> ctx.output(outputMap.get(tag), value);
>
> }
>
> }
>
> }
>
>
>
> Exception comes at highlighted line.
>
>
>
>
>
> Regards,
>
> Priyanka
>
> *From:* Taher Koitawala 
> *Sent:* Monday, January 11, 2021 6:50 PM
> *To:* Priyanka Kalra A 
> *Cc:* user ; Sushil Kumar Singh B <
> sushil.kumar.b.si...@ericsson.com>; Anuj Kumar Jain A <
> anuj.kumar.a.j...@ericsson.com>; Chirag Dewan ;
> Pankaj Kumar Aggarwal 
> *Subject:* Re: Timestamp Issue with OutputTags
>
>
>
> Can you please share your code?
>
>
>
> On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <
> priyanka.a.ka...@ericsson.com> wrote:
>
> Hi Team,
>
>
>
> We are generating multiple side-output tags and using default processing
> time on non-keyed stream. The class $YYY extends *ProcessFunction* O> and implementation is provided for *processElement* method. Upon
> sending valid data, it gives error "*Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null*".
>
>
>
>- Why is it not able to read timestamp?
>- Why is not taking system default time as processing time?
>
>
>
> *Complete stack trace for reference:*
>
> java.lang.IllegalArgumentException: Invalid timestamp:
> -9223372036854775808. Timestamp should always be non-negative or null.
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:70)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:93)
> ~[kafka-clients-0.11.0.2.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97)
> ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> *at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> *at
> com.eee.dd.ccc.aaa.processing.$YYY.processElement(.java:166)*
>
> *at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]*
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[f

Question on getting the last succesfuly externalized checkpoint path for crashed jobs

2021-01-11 Thread DONG, Weike
Hi community,

We are currently using* Externalized Checkpoints* to prevent abrupt YARN
application failures, as it saves a "_metadata" file within the checkpoint
folder which is essential for the job's cold recovery.

As it is designed in Flink, the completed checkpoint paths are like
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1895*,
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1896*,
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1897* ...
and so on.

Originally, we have deployed a tool to periodically request the REST API of
all running Flink jobs to get the latest completed checkpoint paths, and
save them on database for later use. However. the periodic scan frequency
might be lower than the pace that checkpoints are deleted, thus in case of
recovery, the saved directory might have already been deleted and replaced
by new ones.

Here we wonder that if we could just try to list the parent checkpoint
folder (say *hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/*)
and choose the "chk-" directory with the highest  number as the
checkpoint for job recovery.

But there is one concern to address: how can we make sure that the highest
"chk-" folder is indeed a complete one (after digging through the code
in *FsCheckpointMetadataOutputStream *class, there might be a chance that
the file is halfway uploaded but later failed to be removed due to
exceptions or sudden JVM crashes).

Another approach that we come up with is to write a callback to notify us
once a checkpoint is completed via CheckpointListeners like
*AcknowledgeOnCheckpoint*, however, it is also possible that the
notification message never reaches the server before JVM crashes.

What do you think is the right and idiomatic way to get the last
successfully completed externalized checkpoint path to prevent sudden JVM
crashes? Thank you very much : )


AW: Statefun with RabbitMQ consumes message but does not run statefun

2021-01-11 Thread Stephan Pelikan
I found the reason: There is a class incompatibility because I changed from
Statefun 2.2.1 + Flink 1.11.1
to
Statefun 2.2.1 + Flink 1.12.0

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

Is there a possibility to use the newest version of Flink in combination with 
the newest version of Statefun? I'm wondering why there is no Statefun version 
matching the current stable version of Flink?

Stephan


Von: Stephan Pelikan 
Gesendet: Montag, 11. Jänner 2021 19:37
An: user@flink.apache.org
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

Hi,

I try to use RabbitMQ as a Source. My source consumes messages of the queue but 
the statefun is not execution - not even created.

This is my main function:

1 public static void main(String[] args) throws Exception {
2
3 final var env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5 env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);
6
7 env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE);
8 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
9 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
10 
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
11
12 final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
13 statefunConfig.setFlinkJobName("test");
14 statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
15
16 final var connectionConfig = new RMQConnectionConfig.Builder()
17 .setHost("localhost")
18 .setUserName("guest")
19 .setPassword("guest")
20 .setPort(5672)
21 .setVirtualHost("test")
22 .setPrefetchCount(5000)
23 .build();
24
25 final var deserializationSchema = new 
TypeInformationSerializationSchema<>(
26 new ProtobufTypeInformation<>(Any.class), env.getConfig());
27 final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, 
true, deserializationSchema);
28
29 final var source = env
30 .addSource(rmqSource, TEST_INGRESS)
31 .setParallelism(1)
32 .map(msg -> {
33 return RoutableMessageBuilder
34 .builder()
35 .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
36 .withMessageBody(msg)
37 .build();
38 });
39
40 StatefulFunctionDataStreamBuilder
41 .builder("test")
42 .withDataStreamAsIngress(source)
43 .withFunctionProvider(MyStatefun.TYPE, unused -> {
44 return new MyStatefun();
45 })
46 .withEgressId(MyStatefun.EGRESS)
47 .withConfiguration(statefunConfig)
48 .build(env)
49 .getDataStreamForEgressId(MyStatefun.EGRESS)
50 .addSink(new PrintSinkFunction<>(true));
51
52 env.execute();
53
54 }

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 
is never called. The message is reportingly consumed but never acknowledged or 
processed. Before using RabbitMQ I used a custom SourceFunction to fake input 
data and it worked well.

To setup things I use a local environment but logging does not show up any 
errors. Before my current problem I had another error during message 
deserialization and it wasn't reported either. Unfortunately I didn't manage to 
get the exception in the log/stdout. I had to use the debugger to find the 
reason of the former problem. In this situation now the debugger shows no 
thrown or caught exceptions. That's way I stuck.

Of course I would like to know what's the problem with my code. But I guess it 
is not obviously. Maybe some can give me a hint how to turn on exception 
logging which might help to get closer to the origin of the phenomenon.

Thanks in advance,
Stephan



Pushing Down Filters

2021-01-11 Thread Satyam Shekhar
Hello,

I am using Flink 1.11.2 as the execution engine for an alerting
application. Our application builds atop Flink's SQL API to run streaming
and batch jobs on a proprietary storage engine. We have a custom
StreamTableSource implementation that connects to our storage engine. The
connector currently implements the ProjectableTableSource interface. I now
wish to extend the connector to push down filters to the source for
improved performance. I have run into multiple issues in that effort -

1. Optimizer does not use both - ProjectableTableSource and
FilterableTableSource
- in a single query even if the source implements both interfaces. Each
interface works correctly if implemented independently.

2. Implementations of FilterableTableSource fail inside the optimizer for a
few TPC-DS queries in batch mode.

Stacktrace:

java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16,
_UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
   at org.apache.calcite.rel.core.Filter.(Filter.java:74)
   at
org.apache.calcite.rel.logical.LogicalFilter.(LogicalFilter.java:68)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
   at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
   at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
   ...
   at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)


Config:
  var settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build();
  TableEnvironment.create(settings);

3. And finally, filter expressions containing the current timestamp (& now)
function are not resolved to constant values during predicate pushdown
optimizer. Let's take the following SQL query for example - select count(*)
from T0 where T0.C2 >= current_timestamp.  Here, applyPredicate method of
FilterableTableSource receives predicate as a CallExpression of  form
greaterThanOrEqual(C2,
currentTimestamp()). I'd have expected currentTimestamp to be resolved to a
constant value that is identitcal across all usages of currentTimestamp in
the query.

Regards,
Satyam