Re: How to proper hashCode() for keys.

2022-02-02 Thread Francesco Guardiani
Hi,
your hash code and equals seems correct. Can you post a minimum stream
pipeline reproducer using this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
>
> This is my class, is my hashCode deterministic?
>
> public final class MyEventCountKey {
> private final String countDateTime;
> private final String domain;
> private final String event;
>
> public MyEventCountKey(final String countDateTime, final String domain, 
> final String event) {
> this.countDateTime = countDateTime;
> this.domain = domain;
> this.event = event;
> }
>
> public String getCountDateTime() {
> return countDateTime;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public String getEven() {
> return event;
> }
>
> @Override
> public String toString() {
> return countDateTime + "|" + domain + "|" + event;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyEventCountKey that = (MyEventCountKey) o;
> return countDateTime.equals(that.countDateTime) &&
> domain.equals(that.domain) &&
> event.equals(that.event);
> }
>
> @Override
> public int hashCode() {
> final int prime = 31;
> int result = 1;
> result = prime * result + countDateTime.hashCode();
> result = prime * result + domain.hashCode();
> result = prime * result +  event.hashCode();
> return result;
> }
> }
>
>


Re: Future support for custom FileEnumerator in FileSource?

2022-02-02 Thread Francesco Guardiani
Hi,
>From what I see here
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/AbstractFileSource.AbstractFileSourceBuilder.html#setFileEnumerator-org.apache.flink.connector.file.src.enumerate.FileEnumerator.Provider-
the file enumerator can be setup with the FileSourceBuilder:

fileSourceBuilder.setFileEnumerator(new FileEnumerator.Provider() {
@Override
public FileEnumerator create() {
// Do something
return null;
}
})


Hope it helps,
FG


Read Avro type records from kafka using Python - Datastream API

2022-02-02 Thread Hussein El Ghoul
Hello,

I am currently working on program that uses flink to read avro type records 
from kafka.
I have the avro schema of the records I want to read in a file but I looked all 
over github, the documentation and stack Overflow for examples on how to use 
AvroRowDeserializationSchema to deserialize a Kafka topic with Avro type but 
could not find any.
Could you please provide an example using Python that is compatible with the 
Datastream API ?

Best Regards,
Hussein
Quiqup - Data Engineer

Re: Read Avro type records from kafka using Python - Datastream API

2022-02-02 Thread Martijn Visser
Hi Hussein,

Thanks for your question. I see that you've posted this multiple times in
the past couple of days, please leave some time for people to reply. The
User mailing list is set up for asynchronous replies, sending it multiple
times in such a short time frame makes it seem pushy. People try to help
out each other based on best effort.

I'm looping in Dian and Xingbo since they are more familiar with the Python
stack in Flink. However, since this week Chinese New Year is celebrated, it
could be that it takes them longer to respond.

Best regards,

Martijn


On Wed, 2 Feb 2022 at 09:43, Hussein El Ghoul  wrote:

> Hello,
>
> I am currently working on program that uses flink to read avro type
> records from kafka.
> I have the avro schema of the records I want to read in a file but I
> looked all over github, the documentation and stack Overflow for examples
> on how to use *AvroRowDeserializationSchema *to deserialize a Kafka topic
> with Avro type but could not find any.
> Could you please provide an example using *Python* that is compatible
> with the *Datastream API* ?
>
> Best Regards,
> Hussein
> Quiqup - Data Engineer
>


Re: ParquetColumnarRowInputFormat - parameter description

2022-02-02 Thread Krzysztof Chmielewski
Thank you Fabian,

I have one followup question.

You wrote:


*isUtcTimestamp denotes whether timestamps should be represented asSQL UTC
timestamps.*
Quetion:
So, if  *isUtcTimestamp *is set to false, how timestamps are represented?

Regards,
Krzysztof Chmielewski

wt., 25 sty 2022 o 11:56 Fabian Paul  napisał(a):

> Hi Krzysztof,
>
> sorry for the late reply. The community is very busy at the moment
> with the final two weeks of Flink 1.15.
>
> The parameters you have mentioned are mostly relevant for the internal
> conversion or representation from Parquet types to Flink's SQL type
> system.
>
> - isUtcTimestamp denotes whether timestamps should be represented as
> SQL UTC timestamps
> - batchSize is an internal number of how many rows are put into one
> vector. Vectors are used internally in Flink SQL for performance
> reasons to enable faster execution on batches i.e. for Hive we use the
> following default value [1]
> - isCaseSensitive is used to map the field/column names from parquet
> and match them to columns in Flink
>
> I have also included @jingsongl...@gmail.com who is more familiar with
> the parquet format.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/d8a031c2b7d7b73fe38a3f894913d3dcaa5a4111/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/VectorizedColumnBatch.java#L46
>
> On Mon, Jan 24, 2022 at 4:32 PM Krzysztof Chmielewski
>  wrote:
> >
> > Hi,
> > I would like to bump this up a little bit.
> >
> > The isCaseSensitive  is rather clear. If this is false, then column read
> in parquet file is case insensitive.
> > batchSize - how many records we read from the Parquet file before
> passing it to the upper classes right?
> >
> > Could someone describe what  timestamp flab does with some examples?
> >
> > Regards,
> > Krzysztof Chmielewski
> >
> >
> > pon., 10 sty 2022 o 14:59 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
> >>
> >> Hi,
> >> I would like to ask for some more details regarding three
> ParquetColumnarRowInputFormat contruction parameters.
> >>
> >> The parameters are:
> >> batchSize,
> >> isUtcTimestamp,
> >> isCaseSensitive
> >>
> >> The parametr names gives some hint about their purpose but there is no
> description in docs (java, flink page).
> >>
> >> Could you provide me some information about the batching process and
> other two boolean flags?
> >>
> >> Regards,
> >> Krzysztof Chmielewski
>


Re: GenericType problem in KeyedCoProcessFunction after disableGenericTypes()

2022-02-02 Thread Deniz Koçak
Hi Yoel,

Thank you for answering my question, really appreciated. However, that
brings my mind more questions.

1) As far as I understood from the Flink built-in serialization, if a
class is a POJO type wrt the items defined in POJOs section here [1],
if an object is an instance of a POJO type class, no need to do
anything extra. Serialization can be handled via `PojoSerializer`
naively? In our case with `java.utils.UUID`, that class has no
no-arguments public constructor which cause the fallback to Kyro I
guess?

2) In your suggestion `you could provide a supplier of UUIDGenerator`,
I am not sure what exactly you refer here? Something like a lambda
which is capable of creating an instance of `UUIDGenerator` on demand,
which can be called within open() method?

3) `java.util.UUID` class also implements `java.io.Serializable`, but
as far as I understood that does not mean a class implementing
`java.io.Serializable` is always be able to be serialized via Flink?
At least with the built-in serializers (no referring Kyro)? UUID class
is java Serializable, though still seems to be a generic type because
does not fall into any of the categories mentioned in [1]?

4) When Kyro fallback is enabled, that means fallback is the last
resort if the object can be serialized with


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/

On Wed, Feb 2, 2022 at 6:27 AM Yoel Benharrous
 wrote:
>
> Hi Deniz,
>
> You could declare UUIDGenerator as a transient field and instanciate it in 
> the open
> function
>
> Ot if you want to inject any UUIDGenerator you could provide a supplier of 
> UUIDGenerator that should implement Serializable and invoke it in the open 
> function.
>
>
>
>
> On Tue, Feb 1, 2022, 10:01 PM Deniz Koçak  wrote:
>>
>> Hi All,
>>
>> We have a function extending `KeyedCoProcessFunction` and within that
>> function implementation. I wanted to keep a class object as a field
>> which is simply responsible for generating a UUID.
>>
>> We disabled Kyro fallback for generic types via
>> `env.getConfig().disableGenericTypes()`.
>> I am receiving the error below when I pass an instance of
>> `UUIDGenerator` to the `KeyedCoProcessFunction` and try to run the
>> job.
>>
>> ***
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and type
>> java.util.UUID is treated as a generic type.
>> ***
>>
>> At that point I wonder how can/should I do to pass an instance of
>> `UUIDGenerator` to `PrebetModelRequestProcessor` function.
>>
>>
>> ModelRequestProcessor Class:
>> 
>> public class ModelRequestProcessor extends
>> KeyedCoProcessFunction> ModelRequest> {
>>
>> protected final UUIDGenerator uuidGenerator;
>>
>> public PrebetModelRequestProcessor(UUIDGenerator generator) {
>> this.uuidGenerator = generator;
>> }
>> }
>> 
>>
>> UUIDGenerator Class:
>> 
>> public class UUIDGenerator implements Serializable {
>>
>> private static final long serialVersionUID = 42L;
>>
>> public java.util.UUID generate() {
>> return java.util.UUID.randomUUID();
>> }
>> }
>> 


Re: GenericType problem in KeyedCoProcessFunction after disableGenericTypes()

2022-02-02 Thread Deniz Koçak
Sorry for not finishing the last - 4th - question

4) When Kyro fallback is enabled, that means fallback is the last
resort if the object can be serialized with any other serializer like
`PojoSerializer`?
Can we still benefit from the performance of Flink built-in serializer
without sacrifying the flexibility of Kyro?

On Wed, Feb 2, 2022 at 11:22 AM Deniz Koçak  wrote:
>
> Hi Yoel,
>
> Thank you for answering my question, really appreciated. However, that
> brings my mind more questions.
>
> 1) As far as I understood from the Flink built-in serialization, if a
> class is a POJO type wrt the items defined in POJOs section here [1],
> if an object is an instance of a POJO type class, no need to do
> anything extra. Serialization can be handled via `PojoSerializer`
> naively? In our case with `java.utils.UUID`, that class has no
> no-arguments public constructor which cause the fallback to Kyro I
> guess?
>
> 2) In your suggestion `you could provide a supplier of UUIDGenerator`,
> I am not sure what exactly you refer here? Something like a lambda
> which is capable of creating an instance of `UUIDGenerator` on demand,
> which can be called within open() method?
>
> 3) `java.util.UUID` class also implements `java.io.Serializable`, but
> as far as I understood that does not mean a class implementing
> `java.io.Serializable` is always be able to be serialized via Flink?
> At least with the built-in serializers (no referring Kyro)? UUID class
> is java Serializable, though still seems to be a generic type because
> does not fall into any of the categories mentioned in [1]?
>
> 4) When Kyro fallback is enabled, that means fallback is the last
> resort if the object can be serialized with
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/
>
> On Wed, Feb 2, 2022 at 6:27 AM Yoel Benharrous
>  wrote:
> >
> > Hi Deniz,
> >
> > You could declare UUIDGenerator as a transient field and instanciate it in 
> > the open
> > function
> >
> > Ot if you want to inject any UUIDGenerator you could provide a supplier of 
> > UUIDGenerator that should implement Serializable and invoke it in the open 
> > function.
> >
> >
> >
> >
> > On Tue, Feb 1, 2022, 10:01 PM Deniz Koçak  wrote:
> >>
> >> Hi All,
> >>
> >> We have a function extending `KeyedCoProcessFunction` and within that
> >> function implementation. I wanted to keep a class object as a field
> >> which is simply responsible for generating a UUID.
> >>
> >> We disabled Kyro fallback for generic types via
> >> `env.getConfig().disableGenericTypes()`.
> >> I am receiving the error below when I pass an instance of
> >> `UUIDGenerator` to the `KeyedCoProcessFunction` and try to run the
> >> job.
> >>
> >> ***
> >> Exception in thread "main" java.lang.UnsupportedOperationException:
> >> Generic types have been disabled in the ExecutionConfig and type
> >> java.util.UUID is treated as a generic type.
> >> ***
> >>
> >> At that point I wonder how can/should I do to pass an instance of
> >> `UUIDGenerator` to `PrebetModelRequestProcessor` function.
> >>
> >>
> >> ModelRequestProcessor Class:
> >> 
> >> public class ModelRequestProcessor extends
> >> KeyedCoProcessFunction >> ModelRequest> {
> >>
> >> protected final UUIDGenerator uuidGenerator;
> >>
> >> public PrebetModelRequestProcessor(UUIDGenerator generator) {
> >> this.uuidGenerator = generator;
> >> }
> >> }
> >> 
> >>
> >> UUIDGenerator Class:
> >> 
> >> public class UUIDGenerator implements Serializable {
> >>
> >> private static final long serialVersionUID = 42L;
> >>
> >> public java.util.UUID generate() {
> >> return java.util.UUID.randomUUID();
> >> }
> >> }
> >> 


RE: Determinism of interval joins

2022-02-02 Thread Alexis Sarda-Espinosa
Well, for those who might be interested in the semantics I mentioned, I 
implemented a custom operator that seems to achieve what I want by mostly 
ignoring the actual timestamps from the side stream's watermarks. However, it 
kind of depends on the fact that my main stream comes from a previous window 
and is watermarked with "windowEnd - 1" (thus "timestamp1 + 1" below).

public class PrioritizedWatermarkStreamIntervalJoinOperator extends 
IntervalJoinOperator<...> {
private static final long serialVersionUID = 1L;

private long maxTimestamp1 = Long.MIN_VALUE;
private long maxTimestamp2 = Long.MIN_VALUE;

public PrioritizedWatermarkStreamIntervalJoinOperator(...) {
super(...);
}

@Override
public void processWatermark1(Watermark mark) throws Exception {
if (mark.getTimestamp() > maxTimestamp1) {
maxTimestamp1 = mark.getTimestamp();
}
super.processWatermark1(mark);
maybeProcessWatermark2(mark, mark.getTimestamp(), maxTimestamp2);
}

private void maybeProcessWatermark2(Watermark mark, long timestamp1, long 
maxTimestampForComparison) throws Exception {
if (mark.equals(Watermark.MAX_WATERMARK) && maxTimestampForComparison 
== Watermark.MAX_WATERMARK.getTimestamp()) {
super.processWatermark2(Watermark.MAX_WATERMARK);
} else if (maxTimestamp2 > maxTimestamp1) {
if (timestamp1 == Long.MAX_VALUE) {
LOG.warn("Trying to bump timestamp1 would result in overflow, 
skipping.");
return;
}
super.processWatermark2(new Watermark(timestamp1 + 1L));
}
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
if (mark.getTimestamp() > maxTimestamp2) {
maxTimestamp2 = mark.getTimestamp();
}
maybeProcessWatermark2(mark, maxTimestamp1, maxTimestamp1);
}
}

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Samstag, 29. Januar 2022 13:47
To: Robert Metzger 
Cc: user@flink.apache.org
Subject: RE: Determinism of interval joins

I think I spoke to soon when I said my watermark strategies were like the 
included ones. My generators mark themselves as idle when they start, and stay 
like that as long as they don't seen any event at all. In the tests, I presume 
a variable number of events (and watermarks) from stream1 were consumed before 
anything from stream2 was, so by the time stream2 emitted a watermark to mark 
itself as not idle, it was already too late, and everything was dropped; I 
debugged some of the operators and could see that a lot of inputs were 
considered late since they were processed when the internal watermark service 
already had Long.MAX_VALUE as current watermark. If I change this idleness 
behavior, I do see changes in the test's output.

When running in real-time, I definitely need to mark some streams as idle after 
some time because I don't expect all of them to receive data constantly. 
However, the non-real-time scenario is also relevant for me, and not just for 
testing, if something crashes in the system and suddently the pipeline needs to 
process backlog, it would be nice if semantics were well defined. Ideally, this 
would mean, for two-input operators in general I imagine, that when an operator 
knows that all streams from one input have passed a certain watermark (based on 
slide/tumble time), it would switch and consume from the other stream to check 
whether it's idle or not. I suppose this wouldn't be a definite guarantee 
either since the data from the different streams may take some time to reach 
the different operators (latency and whatnot), but it would still be useful.

I imagine the details are more complex and I'm oversimplifying a bit (I don't 
know how the network stack works), but I would think this kind of semantics are 
commonly expected when handling multiple streams that need joins and so on. 
What do you think?

Regards,
Alexis.

From: Robert Metzger mailto:metrob...@gmail.com>>
Sent: Freitag, 28. Januar 2022 14:49
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org
Subject: Re: Determinism of interval joins

Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the 
behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I'm not sure if the issue in [1] is relevant since it mentions the Table API, 
but it could be. Since stream1 and stream2 in my example have a long chain of 
operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, 
the input file is sorted, and the watermark strategies should essentially 
behave like the monotonous generator.

[1] https://issues.apache.org/jira/browse/FLINK-24466

Regards,
Alexis.



Pojo State Migration - NPE with field deletion

2022-02-02 Thread bastien dine
Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
*"Fields can be removed. Once removed, the previous value for the removed
field will be dropped in future checkpoints and savepoints."*

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(
PojoSerializer.java:119)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:184)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:56)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field
serializer in the corresponding object PojoSerializer "fieldSerializers"
array
But it is not present in the "fields", where we have a gap of 1 index (for
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :

2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for operator
OperatorXXX
at org.apache.flink.streaming.runtime.tasks.StreamTask
.triggerCheckpointOnBarrier(StreamTask.java:1274)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.triggerCheckpoint(
SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
SingleCheckpointBarrierHandler.java:493)
at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.barrierReceived(
AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.lambda$processBarrier$2(
SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler
.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
at org.apache.flink.runtime.taskmanager.Task
.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 2737490 for operator OperatorXXX
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:265)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:170)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.checkpointStreamOperator(RegularOperato

RE: Pojo State Migration - NPE with field deletion

2022-02-02 Thread Alexis Sarda-Espinosa
Hello,

Happened to me too, here’s the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

From: bastien dine 
Sent: Mittwoch, 2. Februar 2022 16:01
To: user 
Subject: Pojo State Migration - NPE with field deletion

Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
"Fields can be removed. Once removed, the previous value for the removed field 
will be dropped in future checkpoints and savepoints."

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:119)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:184)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field 
serializer in the corresponding object PojoSerializer "fieldSerializers" array
But it is not present in the "fields", where we have a gap of 1 index (for 
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been 
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :
2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for 
operator OperatorXXX
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 2737490 for operator OperatorXXX
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
at

AbstractMethodError when running StateFun job as jar

2022-02-02 Thread Christopher Gustafson
Hi everyone,


I am trying to run a StateFun job as a jar in an existing cluster. But I run 
into a large error which is caused by the following:

Caused by: java.lang.AbstractMethodError: Method
org/apache/flink/statefun/flink/core/functions/FunctionGroupDispatchFactory
.setMailboxExecutor(Lorg/apache/flink/runtime/mailbox/MailboxExecutor;)V is 
abstract


Any ideas what could be causing this issue? The job I am trying to run is the 
shopping-cart from the StateFun playground. I am currently running it by doing 
the following:


  1.  Add flink-statefun-distribution as a dependency to the maven project
  2.  Add the module.yaml file to the resources folder
  3.  Compile with mvn clean package
  4.  Submit the jar including the dependencies to my cluster using:
./bin/flink run -c org.apache.flink.statefun.flink.core.StatefulFunctionsJob 
~/Workspace/flink-statefun-playground/java/shopping-cart/target/shopping-cart-3.1.1-jar-with-dependencies.jar

Thanks in advance,
Christopher


Creating Flink SQL Row with named fields

2022-02-02 Thread Vladislav Keda
Hi,

I'm trying to create Row(..) using Flink SQL, but I can't assign names to
its fields.


*For example:*Input table1 structure:* (id INT, some_name STRING)*
Query:  *select *, ROW(id, some_name) as row1 from table1*
Output result structure:
*(id  INT , some_name  STRING, row1 ROW (EXPR$0 INT, EXPR$1 STRING))*

*Each nested field has a name like EXPR$ that does not satisfy me.*

*If I write, for example:*Input table1 structure:* (id INT, some_name
STRING)*
Query:  *select *, ROW(id as nested_id, some_name as nested_some_name) as
row1 from table1*
Output result structure: *(id  INT , some_name  STRING, row1 ROW (EXPR$0
INT, EXPR$1 STRING))*


*I will get an exception like: *








*Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
failed. Encountered "as" at line 1, column 20.Was expecting one of:")"
..."," ...at
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at
ru.glowbyte.streaming.core.operators.internal.sql.SqlDrivenOperator.sqlQuery(SqlDrivenOperator.java:159)
... 59 more*

How can I set the name for the field?

Flink version - 1.13.3.

---

Best Regards,
Vladislav Keda


Re: Future support for custom FileEnumerator in FileSource?

2022-02-02 Thread Kevin Lam
Hi,

Totally missed that setFileEnumerator method. That definitely helps, I
checked it out and this does what we were looking for.

Thanks FG!

On Wed, Feb 2, 2022 at 3:07 AM Francesco Guardiani 
wrote:

> Hi,
> From what I see here
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/AbstractFileSource.AbstractFileSourceBuilder.html#setFileEnumerator-org.apache.flink.connector.file.src.enumerate.FileEnumerator.Provider-
> the file enumerator can be setup with the FileSourceBuilder:
>
> fileSourceBuilder.setFileEnumerator(new FileEnumerator.Provider() {
> @Override
> public FileEnumerator create() {
> // Do something
> return null;
> }
> })
>
>
> Hope it helps,
> FG
>


Re: AbstractMethodError when running StateFun job as jar

2022-02-02 Thread Igal Shilman
Hello Christopher,

It seems to be like a version mismatch, which StateFun version are
you using, and what is the Flink version of the cluster that you are trying
to submit to?

StateFun 3.1.1 was built with Flink 1.13.5
StateFun 3.2.0 was built with Flink 1.14.3

The version needs to match, since the StateFun job is built on top of
internal APIs, and unfortunately these APIs are not guaranteed to remain
compatible across versions.
This is one of the reasons, where it is generally recommended to use the
community provided Docker images, as Flink and Statefun versions are
compatible in a single image.

Kind regards,
Igal.


On Wed, Feb 2, 2022 at 4:50 PM Christopher Gustafson  wrote:

> Hi everyone,
>
>
> I am trying to run a StateFun job as a jar in an existing cluster. But I
> run into a large error which is caused by the following:
>
> Caused by: java.lang.AbstractMethodError: Method
> org/apache/flink/statefun/flink/core/functions/
> FunctionGroupDispatchFactory
> .setMailboxExecutor(Lorg/apache/flink/runtime/mailbox/MailboxExecutor;)V
> is abstract
>
>
> Any ideas what could be causing this issue? The job I am trying to run is
> the shopping-cart from the StateFun playground. I am currently running it
> by doing the following:
>
>
>
>1. Add flink-statefun-distribution as a dependency to the maven project
>2. Add the module.yaml file to the resources folder
>3. Compile with mvn clean package
>4. Submit the jar including the dependencies to my cluster using:
>./bin/flink run -c
>org.apache.flink.statefun.flink.core.StatefulFunctionsJob
>
> ~/Workspace/flink-statefun-playground/java/shopping-cart/target/shopping-cart-3.1.1-jar-with-dependencies.jar
>
>
> Thanks in advance,
> Christopher
>
>


Re: IllegalArgumentException: URI is not hierarchical error when initializating jobmanager in cluster

2022-02-02 Thread Javier Vegas
Thanks, Robert!

I tried the classloader.resolve.order: parent-first option but ran into
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" errors
(because I use logback so I followed
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/#configuring-logback
and removed log4j-slf4j-impl from the classpath. But putting all my classes
in lib/ instead of usrlib/ fixed that problem, and everything now runs
fine. Thanks!

El vie, 28 ene 2022 a las 6:11, Robert Metzger ()
escribió:

> Hi Javier,
>
> I suspect that TwitterServer is using some classloading / dependency
> injection / service loading "magic" that is causing this.
> I would try to find out, either by attaching a remote debugger (should be
> possible when executing in cluster mode locally) or by adding log
> statements in the code, what the URI it's trying to load looks like.
>
> On the cluster, Flink is using separate classloaders for the base flink
> system, and the user code (as opposed to executing in the IDE, where
> everything is loaded from the same loader). Check out this page and try out
> the config arguments:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
>
>
> On Wed, Jan 26, 2022 at 4:13 AM Javier Vegas  wrote:
>
>> I am porting a Scala service to Flink in order to make it more scalable
>> via running it in a cluster. All my Scala services extends a base Service
>> class that extends TwitterServer (
>> https://github.com/twitter/twitter-server/blob/develop/server/src/main/scala/com/twitter/server/TwitterServer.scala)
>> and that base class contains a lot of logic about resource initialization,
>> logging, stats and error handling, monitoring, etc that I want to keep
>> using in my class. I ported my logic to Flink sources and sinks, and
>> everything worked fine when I ran my class in single JVM mode either from
>> sbt or my IDE, Flink's jobmanager and taskmanagers start and run my app.
>> But when I try to run my application in cluster mode, when launching my
>> class with "./bin/standalone-job.sh start --job-classname" the
>> jobmanager runs into a "IllegalArgumentException: URI is not hierarchical"
>> error on initialization, apparently because TwitterServer is trying to load
>> something from the class path (see attached full log).
>>
>> Is there anything I can do to run a class that extends TwitterServer in a
>> Flink cluster? I have tried making my class not extend it and it worked
>> fine, but I really want to keep using all the common infraestructure logic
>> that I have in my base class that extends TwitterServer.
>>
>> Thanks!
>>
>


Flink 1.12.1 and KafkaSource

2022-02-02 Thread Marco Villalobos
According to the Flink 1.12 documentation (
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html),
it states to use FlinkKafkaSource when consuming from Kafka.

However, I noticed that the newer API uses KafkaSource, which uses
KafkaSourceBuilder and OffsetsInitializer.

Although I am on the Flink 1.12 codebase, I preemptively decided to use
KafkaSource instead in order to use the more advanced offsets feature. It
worked, until I deployed it to EMR and had to connect to AWS Kafka (MSK).

The logs show a few suspicious things.

1) The ConsumerConfig logs these properties:

security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

but I actually passed the following:

security.protocol = SSL
ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts
ssl.truststore.password = changeit
ssl.truststore.type = JKS

2) The job fails and this exception is thrown:

2022-02-03 00:40:57,239 ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] -
Exception while handling result from async call in
SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink. Triggering
job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle partition
splits change due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
[flink-dist_2.12-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_312]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient
thread has exited.
2022-02-0