Re: Idiomatic way to split pipeline

2019-12-01 Thread Avi Levi
Thanks Arvid,
The problem is that I will get an exception on non unique uid on the
*stream* .

On Thu, Nov 28, 2019 at 2:45 PM Arvid Heise  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> it seems to me that you are not really needing any split feature. As far
> as I can see in your picture you want to apply two different windows on the
> same input data.
>
> In that case you simply use two different subgraphs.
>
> stream = ...
>
> stream1 = stream.window(...).addSink()
>
> stream2 = stream.window(...).addSink()
>
> In Flink, you can compose arbitrary directed acyclic graphs, so consuming
> the output of one operator on several downstream operators is completely
> normal.
>
> Best,
>
> Arvid
>
> On Mon, Nov 25, 2019 at 10:50 AM Avi Levi  wrote:
>
>> Thanks, I'll check it out.
>>
>> On Mon, Nov 25, 2019 at 11:46 AM vino yang  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Hi Avi,
>>>
>>> The side output provides a superset of split's functionality. So
>>> anything can be implemented via split also can be implemented via side
>>> output.[1]
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data
>>> 
>>>
>>> Avi Levi  于2019年11月25日周一 下午5:32写道:
>>>
 Thank you, for your quick reply. I appreciate that.  but this it not
 exactly "side output" per se. it is simple splitting. IIUC The side output
 is more for splitting the records buy something the differentiate them
 (latnes , value etc' ) . I thought there is more idiomatic but if this is
 it, than I will go with that.

 On Mon, Nov 25, 2019 at 10:42 AM vino yang 
 wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> As the doc of DataStream#split said, you can use the "side output"
> feature to replace it.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> 
>
> Best,
> Vino
>
> Avi Levi  于2019年11月25日周一 下午4:12写道:
>
>> Hi,
>> I want to split the output of one of the operators to two pipelines.
>> Since the *split* method is deprecated, what is the idiomatic way to
>> do that without duplicating the operator ?
>>
>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>
>>
>>


[ANNOUNCE] Weekly Community Update 2019/48

2019-12-01 Thread Konstantin Knauf
Dear community,

happy to share a short community update this week. With one week to go to
the planned feature freeze for Flink 1.10 and Flink Forward Asia in Beijing
the dev@ mailing list pretty quiet these days.

Flink Development
==

* [releases] Hequn has started the vote on RC1 for Flink 1.8.3, which
unfortunately has already received a -1 due to wrong/missing license
information. Expecting a new RC soon. [1]

* [sql] In the past timestamp fields in Flink SQL were internally
represented as longs and it was recommended to use longs directly in
user-defined functions. With the introduction of a new TimestampType the
situation has changed and conversion between long and TIMESTAMP will be
disabled. [2]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-3-release-candidate-1-tp35401p35407.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Disable-conversion-between-TIMESTAMP-and-Long-in-parameters-and-results-of-UDXs-tp35269p35271.html

Notable Bugs
==

* [FLINK-14930] [1.9.1] The OSS filesystem did not allow the configuration
of additional credentials providers due to a shading-related bug. Resolved
for 1.9.2. [3]

[3] https://issues.apache.org/jira/browse/FLINK-14930

Events, Blog Posts, Misc
===

* Flink Forward Asia took place this week at the National Congress Center
in Beijing organized by Alibaba. Talks by Ververica, Tencent, Baidu,
Alibaba, Dell, Lyft, Netflix, Cloudera and many other members of the
Chinese Apache Flink community, and more than 1500 attendees as far as I
heard. Impressive! [4]

* At Flink Forward Asia Alibaba announced it has open sourced Alink, a
machine learning library on top of Apache Flink[5,6]

* Upcoming Meetups
* The first meetup of the Apache Flink Meetup Chicago on 5th of
December comes with four talks(!) highlighting different deployment methods
of Apache Flink (AWS EMR, AWS Kinesis Analytics, Verveirca Platform, IBM
Kubernetes). Talks by *Trevor Grant*, *Seth Wiesman*, *Joe Olson* and
*Margarita
Uk*. [7]
* On December 17th there will be the second Apache Flink meetup in
Seoul. Maybe Dongwon can share the list of speakers in this thread, my
Korean is a bit rusty. [8]

[4] https://m.aliyun.com/markets/aliyun/developer/ffa2019
[5]
https://technode.com/2019/11/28/alibaba-cloud-machine-learning-platform-open-source/
[6] https://github.com/alibaba/Alink/blob/master/README.en-US.md
[7]
https://www.meetup.com/Chicago-Apache-Flink-Meetup-CHAF/events/266609828/
[8] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/

Cheers,

Konstantin (@snntrable)

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Read multiline JSON/XML

2019-12-01 Thread vino yang
Also, say sorry to Flavio!

Best,
Vino

vino yang  于2019年12月2日周一 上午10:29写道:

> Hi Chesnay,
>
> Sorry, yes, I lost the "like" keyword. I mistakenly thought he wanted to
> ask how to use Spark to accomplish this job.
>
> Best,
> Vino
>
> Chesnay Schepler  于2019年11月29日周五 下午10:01写道:
>
>> Why vino?
>>
>> He's specifically asking whether Flink offers something _like_ spark.
>>
>> On 29/11/2019 14:39, vino yang wrote:
>>
>> Hi Flavio,
>>
>> IMO, it would take more effect to ask this question in the Spark user
>> mailing list.
>>
>> WDYT?
>>
>> Best,
>> Vino
>>
>> Flavio Pompermaier  于2019年11月29日周五 下午7:09写道:
>>
>>> Hi to all,
>>> is there any out-of-the-box option to read multiline JSON or XML like in
>>> Spark?
>>> It would be awesome to have something like
>>>
>>> spark.read .option("multiline", true) .json("/path/to/user.json")
>>>
>>> Best,
>>> Flavio
>>>
>>
>>


Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2019-12-01 Thread Congxian Qiu
Hi

>From the exception `No key set. This method should not be called outside of
a keyed context.` it means that the key current passed in is null. In my
opinion, it's something wrong here if there will throw an exception when no
data arrive. could you please share the whole stack and a minimal
reproducible job for this issue?

Best,
Congxian


Salva Alcántara  于2019年12月1日周日 下午3:01写道:

> Given:
>
>
> ```scala
> class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data,
> Prediction]
>   with CheckpointedFunction {
>
>   // To hold loaded models
>   @transient private var models: HashMap[(String, String), Model] = _
>
>   // For serialization purposes
>   @transient private var modelsBytes: MapState[(String, String),
> Array[Bytes]] = _
>
>   ...
>
>   override def snapshotState(context: FunctionSnapshotContext): Unit = {
> modelsBytes.clear() // This raises an exception when there is no
> active key set
> for ((k, model) <- models) {
>   modelsBytes.put(k, model.toBytes(v))
> }
>   }
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = {
> modelsBytes = context.getKeyedStateStore.getMapState[String, String](
>   new MapStateDescriptor("modelsBytes", classOf[String],
> classOf[String])
> )
>
> if (context.isRestored) {
>   // restore models from modelsBytes
> }
>   }
>
> }
> ```
>
> It happens that `modelsBytes.clear()` raises an exception when there is no
> active key. This happens when I start the application from scratch without
> any data on the input streams. So, when the time for a checkpoint comes, I
> get this error:
>
> `java.lang.NullPointerException: No key set. This method should not be
> called outside of a keyed context.`
>
> However, when the input stream contains data, checkpoints work just fine.
> I am a bit confused about this because `snapshotState` does not provide a
> keyed context (contrary to `processElement1` and `processElement2`, where
> the current key is accessible by doing `ctx.getCurrentKey`) so it seems to
> me that the calls to `clear` and `put` within `snapshotState` should fail
> always since they're supposed to work only within a keyed context. Can
> anyone clarify if this is the expected behaviour actually?
>
>


Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

2019-12-01 Thread Congxian Qiu
Hi

>From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the
same way Flink’s keyBy would partition the data in a shuffle w.r.t.
key-group assignment.
you should make sure that the key locates in the right key-group, and the
key-group locates in the right parallelism. you can ref
KeyGroupRangeAssignment[2] for more information.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
Best,
Congxian


Robin Cassan  于2019年11月30日周六 上午12:17写道:

> Hi all!
>
> We are trying to build a Flink job that consumes a Kafka topic, groups
> the incoming events in Session Windows according to a String that can
> be generated by parsing the message (we call it `SessionKey`) and does
> some processing on the windows before sending them to another Kafka
> topic.
> Our first implementation used a `keyBy` operator on the incoming
> messages before creating the window, but we realized that we could
> pre-partition our data by `SessionKey` when we insert it into the input
> Kafka topic with a custom component. This would avoid having to
> shuffle data around in Flink, since, for a given `SessionKey`, we would
> ensure that all messages with this key will end-up in the same Kafka
> partition and thus be read by the same subtask, on a single
> TaskManager. This means that we should be able to create a keyed-stream
> from the incoming data without having to transfer data between
> TaskManagers.
>
> To achieve that, we have used the `reinterpretAsKeyedStream` method
> instead of the previous `keyBy`. This got rid of the shuffling step,
> but we are wondering if this is the right way of using this feature and
> whether Flink can manage to match the distribution of Keys from Kafka
> with the ones assigned to each TaskManager?
> We have observed that, while attempting to trigger a savepoint, we
> would encounter exceptions that seem to indicate that the TaskManagers
> received data whose `SessionKey` didn't match their assigned Keys.
> Here is one of the stacktrace we saw while savepointing:
>
> ```
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
> ```
> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
> Amazon S3.
>
> Is our observation about Flink not being able to match the Kafka
> partitioning with the TaskManager's assigned KeyGroups correct?
> And if so, do you have any pointers on how we could pre-partition our
> data in Kafka so that Flink can avoid shuffling data before creating
> the Session Windows?
>
> Cheers,
>
> Robin
>
> --
>
>
> 
>
> Robin CASSAN
>
> Data Engineer
> +33 6 50 77 88 36
> 5 boulevard de la Madeleine - 75001 Paris
> 
> 
> 
>
>
> 
>


Is it possible to recover from a checkpoint after modify program?

2019-12-01 Thread tison
Hi,

Here is our case: a job, reading data from Kafka, doing some process and
writing to HDFS, has
been running quite a while and has completed checkpoints. Now we'd like to
add a new phase
in process and want to recover from a checkpoint taken before. The new
phase may or may not
be stateful. Is it possible we do the recovery in respective case?

Best,
tison.


Re: [DISCUSS] Disable conversion between TIMESTAMP and Long in parameters and results of UDXs

2019-12-01 Thread Zhenghua Gao
Since it is unanimously agreed that we should disable conversion between
Timestmap and
long in parameters and results of UDXs, in PR [1] we will disable it in
blink planner. And we
will add a release note in FLINK-14599 [2] of this incompatible
modification.



[1] https://github.com/apache/flink/pull/10268
[2] https://issues.apache.org/jira/browse/FLINK-14599

*Best Regards,*
*Zhenghua Gao*


On Sun, Nov 24, 2019 at 8:44 PM Jark Wu  wrote:

> Hi,
>
> +1 to disable it in 1.10. I think it's time to disable and correct the
> behavior now.
>
> Also cc'ed user mailing list to have broader audiences.
>
> Best,
> Jark
>
> On Sat, 23 Nov 2019 at 16:59, Timo Walther  wrote:
>
>> Hi,
>>
>> +1 for disabling it in the Blink planner. Once FLIP-65 is implemented
>> and a UDX is registered with the new
>> TableEnvironment.createTemporaryFunction() we will also have the
>> possibility to be fully compliant with the new type system because we
>> can advertise a new UDF stack with new behavior.
>>
>> Also the mentioned documentation page will be updated as part of FLIP-65.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.11.19 11:08, Jingsong Li wrote:
>> > +1 to disable, It is already introduced by new type system in
>> TimestampType.
>> > I think it is time to update document too.
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Fri, Nov 22, 2019 at 6:05 PM Kurt Young  wrote:
>> >
>> >> +1 to disable, we also need to highlight this in 1.10 release notes.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Fri, Nov 22, 2019 at 5:56 PM Zhenghua Gao  wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> I wanted to bring up the discuss of Disable conversion between
>> TIMESTAMP
>> >>> and Long in parameters and results of UDXs.
>> >>>
>> >>> Since FLINK-12253[1] introduce the new TimestampType and conversion
>> from
>> >>> and
>> >>> to long is not supported, the UDXs with Long parameters should not
>> >> receive
>> >>> TIMESTAMP fields and vice versa.
>> >>>
>> >>> The current situation is we use long as internal representation of
>> >>> TIMESTAMP, the legacy planner and blink planner DO NOT DISABLE this
>> >>> conversion. Now FLINK-14599[2] would introduce a new internal
>> >>> representation of TIMESTAMP and it's time to make a decision to
>> DISABLE
>> >> it.
>> >>>
>> >>> In addition, our document[3] recommends UDXs users use long as
>> >>> representation of SQL_TIMESTAMP, which is obsolete too.
>> >>>
>> >>> Please let me know what you think!
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-12253
>> >>> [2] https://issues.apache.org/jira/browse/FLINK-14599
>> >>> [3]
>> >>>
>> >>>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html#best-practices-for-implementing-udfs
>> >>>
>> >>> *Best Regards,*
>> >>> *Zhenghua Gao*
>> >>>
>> >>
>> >
>> >
>>
>>


Re: Temporary failure in name resolution on JobManager

2019-12-01 Thread Yang Wang
Hi David,

Do you mean when the JobManager starts, the dns has some problem and the
service could
not be resolved? The dns restores to normal, and the JobManager jvm could
not look up the
dns.
I think it may because the jvm dns cache. You could set the ttl and have a
try.
sun.net.inetaddr.ttl
sun.net.inetaddr.negative.ttl


Best,
Yang

David Maddison  于2019年11月29日周五 下午6:41写道:

> I have a Flink 1.7 cluster using the "flink:1.7.2" (OpenJDK build
> 1.8.0_222-b10) image on Kubernetes.
>
> As part of a MasterRestoreHook (for checkpointing) the JobManager needs to
> communicate with an external security service.  This all works well until
> there's a DNS lookup failure (due to network issues) at which point the
> JobManager JVM seems unable to ever successfully look up the name again,
> even when it's confirmed DNS service has been restored.  The weird thing is
> that I can use kubectl to exec into the JobManager POD and successfully
> perform a lookup even while the JobManager JVM is still failing to lookup.
>
> Has anybody seen an issue like this before, or have any suggestions?  As
> far as I'm aware Flink doesn't install a SecurityManager and therefore the
> JVM should only cache invalid name requests for 10 seconds.
>
> Restarting the JobManager JVM does successfully recover the Job, but I'd
> like to avoid having to do that if possible.
>
> Caused by: java.net.UnknownHostException: <>.com: Temporary
> failure in name resolution
> at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
> at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
>
> Thanks in advance,
>
> David
>


Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2019-12-01 Thread Yun Tang
Hi Salva

The root cause is that you did not figure out the difference between keyed 
state and operator state.

There is no ‘currentKey’ in operator state, which means 
PartitionableListState#clear() will clear the whole state. However, there is 
always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only 
remove the entry scoped to current runtime key. In your example code, the state 
to clear is a MapState (not a list state) and therefore must be a keyed state. 
If your job did not process any record, there would no ‘currentKey’ to be set 
[1] for that ‘modelsBytes’ state which lead to the NPE when calling 
‘state#clear()’.

Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to 
snapshot and initialize for operator state.

Last but not least, even you could ensure at least one record processed before 
calling ‘snapshotState’, it’s not clear for your program logic. You cannot 
control well which entry in you state would be cleared since you cannot control 
the current key which set via processing record.

You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state 
could be cleared during snapshotStaet.

[1] 
https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136
[2] 
https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324

Best
Yun Tang


From: Congxian Qiu 
Date: Monday, December 2, 2019 at 10:41 AM
To: Salva Alcántara 
Cc: user 
Subject: Re: Using MapState clear, put methods in snapshotState within 
KeyedCoProcessFunction, valid or not?

Hi

From the exception `No key set. This method should not be called outside of a 
keyed context.` it means that the key current passed in is null. In my opinion, 
it's something wrong here if there will throw an exception when no data arrive. 
could you please share the whole stack and a minimal reproducible job for this 
issue?

Best,
Congxian


Salva Alcántara mailto:salcantara...@gmail.com>> 
于2019年12月1日周日 下午3:01写道:
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, 
Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] 
= _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
modelsBytes.clear() // This raises an exception when there is no active key 
set
for ((k, model) <- models) {
  modelsBytes.put(k, model.toBytes(v))
}
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String, String](
  new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
)

if (context.isRestored) {
  // restore models from modelsBytes
}
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no 
active key. This happens when I start the application from scratch without any 
data on the input streams. So, when the time for a checkpoint comes, I get this 
error:

`java.lang.NullPointerException: No key set. This method should not be called 
outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am 
a bit confused about this because `snapshotState` does not provide a keyed 
context (contrary to `processElement1` and `processElement2`, where the current 
key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the 
calls to `clear` and `put` within `snapshotState` should fail always since 
they're supposed to work only within a keyed context. Can anyone clarify if 
this is the expected behaviour actually?


Table/SQL API to read and parse JSON, Java.

2019-12-01 Thread srikanth flink
Hi there,

I'm following the link

to read JSON data from Kafka and convert to table, programmatically. I'd
try and succeed declarative using SQL client.

My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
Code:

> String schema = "{type: 'object', properties: {'message': {type:
> 'string'},'@timestamp': {type: 'string'}}}";
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(6, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().getCheckpointTimeout();
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> tableEnv.connect(new
> Kafka().version("universal").topic("recon-data").startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
> .withFormat(new
> Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
> .withSchema(new Schema().field("message",
> Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
> .inAppendMode().registerTableSource("reconTableS");
>
> Table t = tableEnv.sqlQuery("select * from reconTableS");
> DataStream out = tableEnv.toAppendStream(t, Row.class);
> out.print();
>
> try {
> env.execute("Flink Example Json");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>

pom.xml:

> 
> UTF-8
> 1.9.0
> 1.8
> 2.11
> ${java.version}
> ${java.version}
> 
>

> 
> org.apache.flink
> flink-streaming-scala_2.11
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-planner_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
> 
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
> flink-cep_2.11
> ${flink.version}
> 
> 
> mysql
> mysql-connector-java
> 5.1.39
> 
> 
>

The code threw the following error:

> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
> at
> kafka.flink.stream.list.match.ExampleJsonParser.main(ExampleJsonParser.java:31)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> ... 12 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=localhost:2181
> connector.properties.1.key=bootstrap.servers
> con

Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

2019-12-01 Thread Gyula Fóra
Hi!

As far as I know,  even if you prepartition the data exactly the same way
in kafka using the key groups, you have  no guarantee that the kafka
consumer source would pick up the right partitions.

Maybe if you have exactly as many kafka partitions as keygroups/max
parallelism, partitioned correctly , but even then you might have to use a
custom source to have the correct partition assignment for the sub tasks.

Long story short, I believe the built in Kafka source doesnt support what
you want. But it should be possible to adapt it to do so.

Cheers
Gyula

On Mon, Dec 2, 2019, 03:49 Congxian Qiu  wrote:

> Hi
>
> From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the
> same way Flink’s keyBy would partition the data in a shuffle w.r.t.
> key-group assignment.
> you should make sure that the key locates in the right key-group, and the
> key-group locates in the right parallelism. you can ref
> KeyGroupRangeAssignment[2] for more information.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
> Best,
> Congxian
>
>
> Robin Cassan  于2019年11月30日周六 上午12:17写道:
>
>> Hi all!
>>
>> We are trying to build a Flink job that consumes a Kafka topic, groups
>> the incoming events in Session Windows according to a String that can
>> be generated by parsing the message (we call it `SessionKey`) and does
>> some processing on the windows before sending them to another Kafka
>> topic.
>> Our first implementation used a `keyBy` operator on the incoming
>> messages before creating the window, but we realized that we could
>> pre-partition our data by `SessionKey` when we insert it into the input
>> Kafka topic with a custom component. This would avoid having to
>> shuffle data around in Flink, since, for a given `SessionKey`, we would
>> ensure that all messages with this key will end-up in the same Kafka
>> partition and thus be read by the same subtask, on a single
>> TaskManager. This means that we should be able to create a keyed-stream
>> from the incoming data without having to transfer data between
>> TaskManagers.
>>
>> To achieve that, we have used the `reinterpretAsKeyedStream` method
>> instead of the previous `keyBy`. This got rid of the shuffling step,
>> but we are wondering if this is the right way of using this feature and
>> whether Flink can manage to match the distribution of Keys from Kafka
>> with the ones assigned to each TaskManager?
>> We have observed that, while attempting to trigger a savepoint, we
>> would encounter exceptions that seem to indicate that the TaskManagers
>> received data whose `SessionKey` didn't match their assigned Keys.
>> Here is one of the stacktrace we saw while savepointing:
>>
>> ```
>> java.lang.IllegalArgumentException: Key group 0 is not in
>> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>> at
>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
>> ```
>> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
>> Amazon S3.
>>
>> Is our observation about Flink not being able to match the Kafka
>> partitioning with the TaskManager's assigned KeyGroups correct?
>> And if so, do you have any pointers on how we could pre-partition our
>> data in Kafka so that Flink can avoid shuffling data before creating
>> the Session Windows?
>>
>> Cheers,
>>
>> Robin
>>
>> --
>>
>>
>> 
>>
>> Robin CASSAN
>>
>> Data Engineer
>> +33 6 50 77 88 36
>> 5 boulevard de la Madeleine - 75001 Paris
>>
>> 
>> 
>> 
>>
>>
>> 
>>
>