Flink drops messages?

2017-11-12 Thread AndreaKinn
Hi, I'm running a Flink application where data are retrieved from a Kafka broker and forwarded to a Cassandra sink. I've implemented the following watermark emitter: public class CustomTimestampExtractor implements AssignerWithPeriodicWatermarks>{ private final long maxOutOfOrderness = 800;

Re: Flink memory usage

2017-11-04 Thread AndreaKinn
Anyway, If I understood how system metrics works (the results seems to be showed in browser) I can't use it because my cluster is accessible only with terminal via ssh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink memory usage

2017-11-04 Thread AndreaKinn
I have used sysstat linux tool. On the node the only one application running is Flink. The outcomes measured with metric system could be different? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Flink memory usage

2017-11-03 Thread AndreaKinn
Hi, I would like to share some considerations about Flink memory consumption. I have a cluster composed of three nodes: 1 used both as JM and TM and other 2 TM. I ran two identical applications (in different moments) on it. The only difference is that on the second one I doubled every operators, e

Re: StreamTransformation object

2017-10-29 Thread AndreaKinn
Thanks for your help, I solved the issue refactoring HTMStream adding new api's -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

2017-10-27 Thread AndreaKinn
ster/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148 > [2] > https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97 > > 2017-10-26 17:36 GMT

Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Can you be clearer about this part? I'm really appreciating your help Tony Wei wrote > you need to refactor `HTMStream` to expose > `InferenceStreamBuilder.build()`. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Mmm looks good. This solution would be great. In this way am I setting a slotSharing group for both learn and select method and not only on select? I believed I need to call slotSharingGroup exactly on the return type of learn. -- Sent from: http://apache-flink-user-mailing-list-archive.233605

Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Sorry Tony it is my fault, I was wrong the first post. Actually now my situation is the following: DataStream> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork()) .select(new InferenceSelectFunction>() {...} so actually the return value of "Lear

StreamTransformation object

2017-10-25 Thread AndreaKinn
Hi, I'm using an external library with Flink I'm trying to implement slotSharingGroup(String) method on it. To do it I looked at SingleOutputStreamOperator Flink's class to see how the method slotSharingGroup(String) is implemented. An abstract: /public class SingleOutputStreamOperator extends Da

Set heap size

2017-10-19 Thread AndreaKinn
About task manager heap size Flink doc says: ... If the cluster is exclusively running Flink, the total amount of available memory per machine minus some memory for the operating system (maybe 1-2 GB) is a good value But my nodes have 2GB of ram each. There isn't an empirical count to set ram

Re: Unbalanced job scheduling

2017-10-17 Thread AndreaKinn
I'm in contact with the founder of the library to deal with the problem. I'm trying also to understand how implement myself slotSharingGroups -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Unbalanced job scheduling

2017-10-17 Thread AndreaKinn
Yes, I considered them but unfortunately I can't call setSlotSharingGroup method on LEARN and SELECT operators. I can call it on the other operators but this means that the two LEARN method will be constrained in the same "unnamed" slot. -- Sent from: http://apache-flink-user-mailing-list-archi

Unbalanced job scheduling

2017-10-16 Thread AndreaKinn
Hi all, I want to expose you my program flow. I have the following operators: kafka-source -> timestamp-extractor -> map -> keyBy -> window -> apply -> LEARN -> SELECT -> process -> cassandra-sink the LEARN and SELECT operators belong to an external library supported by flink. LEARN is a very he

Re: Problems with window function

2017-10-15 Thread AndreaKinn
KeySelector was exactly what I need. Thank you a lot. I modified my code in this way and now it works: DataStream LCxAccStream = env .addSource(new FlinkKafkaConsumer010<>("LCacc", new CustomDeserializer(), properties)).setParallelism(4)

Problems with window function

2017-10-14 Thread AndreaKinn
Hi all, I'm trying to implement a time ordering inside a stream using window function. Then my purposes is to order the element inside a tumbling window. This is my code (written following the doc): DataStream LCxAccStream = env .addSource(new FlinkKafkaConsumer010

Doubts about parallelism

2017-10-14 Thread AndreaKinn
Hi, I read the doc about parallelism, parallel execution and job scheduling but however I have some doubts about parallelism. 1. In my first try I unset parallelism in my code and commented parallelism.default key in link-conf file. In this case I supposed the parallelism was set by Flink automat

Re: NoResourceAvailable exception

2017-10-11 Thread AndreaKinn
the program is composed by: 6 Kafka /source/ connector with custom timestamp and watermark /extractor/ and /map/ function each. then I use 6 instance of an external library called flink-htm (quite heavy) moreover I have 6 /process/ method and 2 /union/ method to merge result streams. Finally I hav

Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
If I apply a sharing slot as in the example: DataStream LTzAccStream = env .addSource(new FlinkKafkaConsumer010<>("topic", new CustomDeserializer(), properties)) .assignTimestampsAndWatermarks(new CustomTimestampExtractor())

Re: NoResourceAvailable exception

2017-09-19 Thread AndreaKinn
Thank you, unfortunately it had no effects. As I add more load on the computation appears the error taskmanager killed on the node on use, without calling other nodes to sustain the computation. I also increased akka.watch.heartbeat.interval akka.watch.heartbeat.pause akka.transport.heartbeat.i

Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
So Flink use the other nodes just if one is completely "full" ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Load distribution through the cluster

2017-09-18 Thread AndreaKinn
Hi, I'm experimenting a bit with the cluster. I didn't set any options about sharing slots and chains hoping that Flink decided autonomously how to balance the load through the nodes of the cluster. My cluster is composed by one job and task manager and two task manager. I noted that every time

Re: NoResourceAvailable exception

2017-09-15 Thread AndreaKinn
Update: Following other discussions I even tried to reduce memory.fraction to 10% without success. How can I set G1 as garbage collector? the key is env.java.opts but the value? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobManager shows TaskManager was lost/killed while TaskManger Process is still running and the network is OK.

2017-09-15 Thread AndreaKinn
Hi, sorry for re-vive this old conversation. I have exactly the same problem, can you provide more details about your solution? Have you used another garbage collector as G1? How can I set it? I've seen on configuration guideline I have to set the option: env.java.opts but I don't know which is th

Re: NoResourceAvailable exception

2017-09-15 Thread AndreaKinn
I tried also to set the only job manager on the first node and reconfiguring the cluster admitting just two task manager. In this way I obtain immediately a NoResourceAvailable error -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: NoResourceAvailable exception

2017-09-15 Thread AndreaKinn
I investigated the semantics of cpu percentage on top. I have to correct my sentence: When I start the program it has a peak at 160% (max is 200%), but after a second it falls down until the 4%. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: NoResourceAvailable exception

2017-09-15 Thread AndreaKinn
the job manager log probably is more interesting: 2017-09-15 12:47:45,420 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-09-15 12:47:45,650 INFO org.apache.flink.runt

Re: NoResourceAvailable exception

2017-09-15 Thread AndreaKinn
This is the log: 2017-09-15 12:47:49,143 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classe$ 2017-09-15 12:47:49,257 INFO org.apache.flink.runtime.taskmanager.TaskManager - ---

Re: NoResourceAvailable exception

2017-09-14 Thread AndreaKinn
P.S.: I tried on my laptop with the same configuration of the job-task manager (ram, slots, parallelism etc...) and it works perfectly. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: NoResourceAvailable exception

2017-09-14 Thread AndreaKinn
Update. the previous error probably was caused because I didn't restart the cluster before a re-execution. (maybe) Then, I tried to execute the program on a cluster of one node on my laptop and, after solved some little issues, everything works fine. Now I'm trying to deploy the same jar on the r

Re: Cassandra Connector Problem (Possible Guava Conflict?)

2017-09-14 Thread AndreaKinn
Hi, I have the same problem but trying your solution so substituting this: org.apache.maven.plugins maven-shade-plugin 2.4.1

NoResourceAvailable exception

2017-09-14 Thread AndreaKinn
Hi, I'm executing a program on a flink cluster. I tried the same on a local node with Eclipse and it worked fine. To start, following Flink recommendations on the cluster I set numberOfTaskSlots equals to the Cpu cores (2) while I set parallelism to 1. Unfortunately when I try to execute I obtain

Re: Can't start cluster

2017-09-14 Thread AndreaKinn
SOLVED using binaries. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can't start cluster

2017-09-14 Thread AndreaKinn
Just a question: >From download page I have to download binaries or source package? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can't start cluster

2017-09-13 Thread AndreaKinn
_setup.html > https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html > > > On Wed, Sep 13, 2017 at 6:57 AM, AndreaKinn < > kinn6aer@ > > wrote: > >> I printed also /flink-bin/bin folder: >> >> > >> root@

Re: Can't start cluster

2017-09-13 Thread AndreaKinn
I printed also /flink-bin/bin folder: > root@giordano-2-2-100-1:~/flink-1.3.2/flink-dist/src/main/flink-bin/bin# > ls > config.sh flink-console.sh jobmanager.sh start-cluster.sh > start-zookeeper-quorum.sh stop-zookeeper-quorum.sh flink flink-daemon.sh pyflink.batstart-local.bat

Can't start cluster

2017-09-13 Thread AndreaKinn
Hi, I'm trying to deploy on a flink cluster the jar of my program. Unfortunately I have a problem when I call on the first node: > root@giordano-2-2-100-1:~# sudo > ./flink-1.3.2/flink-dist/src/main/flink-bin/bin/start-cluster.sh > Starting cluster. > find: ‘/home/giordano/flink-1.3.2/flink-dist/s

Jobmanager and Taskmanager

2017-09-11 Thread AndreaKinn
Hi, I'm configuring a cluster composed by just three nodes. Looking at cluster setup guide I'm setting in the jobmanager the addresses of the workers. A jobmanager can be used also as an additional taskmanager? i.

Re: Java heap size

2017-09-11 Thread AndreaKinn
UPDATE: I also tried using rocksdb and increasing heap size for job manager and task manager to 3072 mb from 2014. Anyway no good news. 14:07:33,973 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1505131653970 14:07:40,027 INFO org.apache.flink.run

Best way to deriving streams from another one

2017-09-10 Thread AndreaKinn
Hi, I have a data stream resulting from an operation executed on a data stream of data. Essentially I want to obtain two different streams from that one to send their to different cassandra tables. I.e.: datastream 0 composed by Tuple3 I want to have: a datastream 1 composed by every triple o

Java heap size

2017-09-10 Thread AndreaKinn
Hi, I developed a program with Flink using OS X. Following the doc so I put in VMArguments of "Run configuration" in eclipse the value: -Xmx800m in order to increase heap memory. I'm using an external lib on flink but all worked perfectly... until now. I modified something in the use of this lib a

Re: Assigning operators to slots

2017-09-08 Thread AndreaKinn
UPDATE: I'm trying to implement the version with one node and two task slots on my laptop. I have also in configured flink-conf.yaml the key: taskmanager.numberOfTaskSlots: 2 but when I execute my program in the IDE: /org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:

Re: Assigning operators to slots

2017-09-08 Thread AndreaKinn
Nice, thank you for reply. So if I call slotSharedGroup(groupname) on the last operator as here: DataStream stream = env .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), properties)) .assignTimestampsAndWatermarks(new CustomTimestampExtractor()) .map(...) .slotSha

Assigning operators to slots

2017-09-08 Thread AndreaKinn
Hi, firstly excuse me for the long post. I already read the documentation about parallelism, slots and the API about it but I still have some doubts about practical implementations of them. My program is composed essentially by three operations: - get data from a kafka source - perform a machine l

Re: Handle event time

2017-09-08 Thread AndreaKinn
Thank you, effectively I developed also a simple custom solution for watermark looking at flink doc but anyway I see unordered printed streams. I have a doubt about flink behaviour: if I understand, flink doesn't perform automatically reordering of records in a stream, so if for instance a record a

Handle event time

2017-09-07 Thread AndreaKinn
Hi, I'm getting sensor data from a kafka source and I absolutely need they are ordered on time data generation basis. I've implemented a custom deserialiser and employed an AscendingTimestampExtractor to handle event time. Obviously I set EventTime as streamTimeCharacteristics. Unfortunately when I

Re: How to fill flink's datastream

2017-09-04 Thread AndreaKinn
ify what is going on from the code you posted. > Are you sure the program is executed, i.e., did you call env.execute()? > Are all parts of the program connected? > Are you sure that the input stream of the Map operator emits records? > > Best, Fabian > > > 2017-09-02 19:23

How to fill flink's datastream

2017-09-02 Thread AndreaKinn
Hi, Excuse me for the unclear title but I don't know how to summarise the question. I'm using an external library integrated with Flink called Flink-HTM. It is still a prototype. Internally, it performs everything I want but I have a problem returning evaluated values in a printable datastream. I

Re: datastream.print() doesn't works

2017-08-31 Thread AndreaKinn
I verified I use just one environment. Unfortunately, (also without using start-local.sh) /callingcreateLocalEnvironmentWithWebUI()/ and run the program from the IDE anyway no one running jobs is listed in the dashboard at /http://localhost:8081/#/overview/. In the ide it is correctly executed me

Re: datastream.print() doesn't works

2017-08-31 Thread AndreaKinn
ther any of these operations >> actually return records. >> >> On 29.08.2017 13:19, AndreaKinn wrote: >>> Hi, >>> I have a simple datastream of a Tuple2. Unfortunately when I call the >>> print() method. No one output is showed although no errors or

Re: datastream.print() doesn't works

2017-08-30 Thread AndreaKinn
Hi, in the night uninstalling and re-installing maven and flink I solved my issue. I started the web dashboard using start-local.sh script and used /createLocalEnvironmentWithWebUI(new Configuration())/ as you suggested. Anyway when I start it in eclipse in the ui dashboard no running jobs are sho

Re: datastream.print() doesn't works

2017-08-29 Thread AndreaKinn
Using mvn clean now I obtain: Error: Could not find or load main class org.apache.flink.mainProgram.StreamingJob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/datastream-print-doesn-t-works-tp15223p15239.html Sent from the Apache Flink

Re: datastream.print() doesn't works

2017-08-29 Thread AndreaKinn
er any of these operations > actually return records. > > On 29.08.2017 13:19, AndreaKinn wrote: >> Hi, >> I have a simple datastream of a Tuple2. Unfortunately when I call the >> print() method. No one output is showed although no errors or exceptions >> are >> r

datastream.print() doesn't works

2017-08-29 Thread AndreaKinn
Hi, I have a simple datastream of a Tuple2. Unfortunately when I call the print() method. No one output is showed although no errors or exceptions are raised. I want to highlight that I have also other data streams which are correctly printed. This is the stream: /DataStream> result = HTM.learn(

Re: Flink-HTM integration

2017-08-27 Thread AndreaKinn
Sure. Firstly I followed the steps showed here to build the project: flink-htm github In my project I want to perform anomaly detection of values in a stream. I have a Kafka broker as source: /DataStream> stream = env

Re: Flink-HTM integration

2017-08-27 Thread AndreaKinn
I think this is not a good idea, I don't know if it's a bug or a my fault. I believe I integrated correctly flink-HTM in my project (but I'm not sure, Flink-HTM is still an embryonal phase I think) and simply I can't see any output after the HTM elaboration, even there are no errors on console.

Flink-HTM integration

2017-08-24 Thread AndreaKinn
Hi, Is there here someone who used Flink-HTM library https://github.com/htm-community/flink-htm ? I'm trying to implement it in my project but I have some fundamental question to complete my thesis work. Regards, Andrea -- View this message in con

Re: TypeInformation in Custom Deserializer

2017-08-13 Thread AndreaKinn
ate, > String, String, Double>>(){}); > > >> On Aug 13, 2017, at 10:31 AM, AndreaKinn < > kinn6aer@ > > wrote: >> >> Hi, >> I'm trying to implement a custom deserialiser to deserialise data from a >> kafka sink. >> So I'm implem

Re: Writing on Cassandra

2017-08-13 Thread AndreaKinn
Ok, this is my situation: I have a stream of Tuple2> the cassandra code: CassandraSink.addSink(stream) .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" + " (user, sensor, timestamp, json_ld, observed_value, value)" + " VALUES (?, ?,

Re: TypeInformation in Custom Deserializer

2017-08-13 Thread AndreaKinn
But I'm using Java primitive type like String, Double plus Date types. Flink doesn't know how to handle them? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861p14863.html Sent from the Apache Flink

TypeInformation in Custom Deserializer

2017-08-13 Thread AndreaKinn
Hi, I'm trying to implement a custom deserialiser to deserialise data from a kafka sink. So I'm implementing a KeyedDeserializedSchema> which ask me to override the method: @Override public TypeInformation> getProducedType() { //to do } Honestly I investigated in link

Re: Error during Kafka connection

2017-08-12 Thread AndreaKinn
It is solvable? I'm not an expert of this stuff and the cluster is managed by the lab responsible. Maybe I can ask him to do something in order to solve. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14

Re: Error during Kafka connection

2017-08-11 Thread AndreaKinn
I just tried to use telnet to public ip:port from outside and it works. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14829.html Sent from the Apache Flink User Mailing List archive. mailing list archiv

Re: Error during Kafka connection

2017-08-11 Thread AndreaKinn
I tried running console consumer-producer from the localhost on the cluster: this say me that the broker is currently active. To reach the cluster from outside I use a redirect from a public (ip, port), because the ip of the kafka broker is private... I suspect the problem can be there. -- View

Re: Error during Kafka connection

2017-08-11 Thread AndreaKinn
the kafka version I use is the latest (0.11.0.0). But to be honestly, also locally I use 0.11.0.0 and in that case it works correctly. Anyway the last kafka connector on flink is designed for kafka 0.10.x.x I use OS X locally and Ubuntu on the cluster. It has importance? -- View this message in

Error during Kafka connection

2017-08-11 Thread AndreaKinn
Hi, In the last week I have correctly deployed a flink program which get data from a kafka broker on my local machine. Now I'm trying to produce the same thing but moving the kafka broker on a cluster. I didn't change any line of code, I report it here: DataStream> stream = env

Re: Writing on Cassandra

2017-08-08 Thread AndreaKinn
I probably solved import issue, but still need help to find some examples of use. Please let me know if someone has experience with Flink and Cassandra together -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-on-Cassandra-tp14744p1474

Writing on Cassandra

2017-08-08 Thread AndreaKinn
Hi, I'm trying to integrate a Cassandra sink in my project but honestly I'm a bit confused because I don't find any examples of use. I want just to populate a table and query it on a single node instance of Cassandra. The only one link I found is: https://ci.apache.org/projects/flink/flink-docs-

Re: mirror links don't work

2017-07-04 Thread AndreaKinn
Yes, I found it googling. Ufuk Celebi wrote > Thanks for reporting this. Did you find these pages by Googling for > the Flink docs? They are definitely very outdated versions of Flink. > > On Tue, Jul 4, 2017 at 4:46 PM, AndreaKinn < > kinn6aer@ > > wrote: &

Re: mirror links don't work

2017-07-04 Thread AndreaKinn
I found it clicking on "download flink for hadoop 1.2" button: https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mirror-links-don-t-work-tp14114p14116.html Sent f

mirror links don't work

2017-07-04 Thread AndreaKinn
Hi, I tried to download apache flink from the page: http://www.apache.org/dyn/closer.cgi/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz but all links lead to a 404 error. Can you fix it please? Thank you Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336

Re: About nodes number on Flink

2017-06-23 Thread AndreaKinn
Hi Timo, thanks for your answer. I think my elaboration are not too much heavy so I imagine I will have no advantages to "parallelize" streams. In my mind I have this pipeline: An

About nodes number on Flink

2017-06-22 Thread AndreaKinn
Hello, I'm developing a Flink toy-application on my local machine before to deploy the real one on a real cluster. Now I have to determine how many nodes I need to set the cluster. I already read these documents: jobs and scheduling

Re: How choose between YARN/Mesos/StandAlone Flink

2017-06-19 Thread AndreaKinn
Ok I understand standalone mode it will be sufficient, but for my thesis I would like to setup a well performed ready-to-use infrastructure. My workload it's not heavy, about 35 millions of messages a day (35 gb) but it should be easily expandable and running for many days... due to this I would li

How choose between YARN/Mesos/StandAlone Flink

2017-06-16 Thread AndreaKinn
Hi, I browsed Flink documentation but I don't find a deep comparison between the feature of Flink in standalone deployment/YARN/Mesos except technical guides to setup them. I'm a newbie in cluster computing so I have never used YARN or Mesos. I've just learned something about their functionalitie

Re: How to divide streams on key basis and deliver them

2017-06-15 Thread AndreaKinn
Thank you a lot Carst, Flink runs at an higher level than I imagined. I will try with some experiments! -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-divide-streams-on-key-basis-and-deliver-them-tp13743p13755.html Sent from the Apa

How to divide streams on key basis and deliver them

2017-06-14 Thread AndreaKinn
Hi, this is my project purpose using Kafka and Flink: In kafka topics there are streams representing sensor lectures of different subjects. Each topic is reserved for a different sensor. Every messag

Re: Can't get keyed messages from Kafka

2017-06-14 Thread AndreaKinn
Thank's that works! -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13725.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Can't get keyed messages from Kafka

2017-06-13 Thread AndreaKinn
Can I ask you to help me? I trying to implement a CustomDeserializer My kafka messages are composed by KeyedMessages where key and messages are strings. I created a new class named CustomObject to manage the message string because it's more complex then a simple string. public class CustomDeseria

Re: Can't get keyed messages from Kafka

2017-06-13 Thread AndreaKinn
But KeyedDeserializationSchema has just 2 implementations: TypeInformationKeyValueSerializationSchema JSONKeyValueDeserializationSchema The first give me this error: 06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInput

Can't get keyed messages from Kafka

2017-06-13 Thread AndreaKinn
Hi, I already spent two days trying to get simple messages from Kafka without success. I have a Kafka producer written in javascript: KeyedMessage = kafka.KeyedMessage; keyed_message = new KeyedMessage(key, string_to_sent); payload = [{topics: topic, messages: keyed_message }]; And I want to re