Seems you are right. It works on the current 1.0-Snapshot version which has a different signature...
> def writeToSocket(
> hostname: String,
> port: Integer,
> schema: SerializationSchema[T]): DataStreamSink[T] = {
> javaStream.writeToSocket(hostname, port, schema)
> }
instead of 0.10.1:
> def writeToSocket(
> hostname: String,
> port: Integer,
> schema: SerializationSchema[T, Array[Byte]]): DataStreamSink[T] = {
> javaStream.writeToSocket(hostname, port, schema)
> }
I guess, you can still implement your own SerializationSchema for 0.10.1
to make it work.
-Matthias
On 01/19/2016 04:27 PM, Saiph Kappa wrote:
> I think this is a bug in the scala API.
>
> def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer,
> schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T,
> scala.Array[scala.Byte]]) :
> org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled
> code */ }
>
>
>
> On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <[email protected]
> <mailto:[email protected]>> wrote:
>
> It should work.
>
> Your error message indicates, that your DataStream is of type
> [String,Array[Byte]] and not of type [String].
>
> > Type mismatch, expected: SerializationSchema[String, Array[Byte]],
> actual: SimpleStringSchema
>
> Can you maybe share your code?
>
> -Matthias
>
> On 01/19/2016 01:57 PM, Saiph Kappa wrote:
> > It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> > used in writeToSocket regardless of the type of the DataStream. Right?
> >
> > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <[email protected]
> <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >
> > What type is your DataStream? It must be DataStream[String] to work
> with
> > SimpleStringSchema.
> >
> > If you have a different type, just implement a customized
> > SerializationSchema.
> >
> > -Matthias
> >
> >
> > On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> > > When I use SimpleStringSchema I get the error: Type mismatch,
> expected:
> > > SerializationSchema[String, Array[Byte]], actual:
> SimpleStringSchema. I
> > > think SimpleStringSchema extends SerializationSchema[String], and
> > > therefore cannot be used as argument of writeToSocket. Can you
> confirm
> > > this please?
> > >
> > > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
> > >
> > >
> > > Thanks.
> > >
> > > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax
> <[email protected] <mailto:[email protected]> <mailto:[email protected]
> <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > >
> > > There is SimpleStringSchema.
> > >
> > > -Matthias
> > >
> > > On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> > > > Hi Matthias,
> > > >
> > > > Thanks for your response. The method .writeToSocket
> seems to be what I
> > > > was looking for. Can you tell me what kind of
> serialization schema
> > > > should I use assuming my socket server receives
> strings. I have
> > > > something like this in scala:
> > > >
> > > > |val server =newServerSocket(9999)while(true){val s
> =server.accept()val
> > > >
>
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
> > > >
> > > > |
> > > >
> > > > Thanks|
> > > > |
> > > >
> > > >
> > > >
> > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax
> <[email protected] <mailto:[email protected]> <mailto:[email protected]
> <mailto:[email protected]>> <mailto:[email protected]
> <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > >
> > > > Hi Saiph,
> > > >
> > > > you can use AllWindowFunction via .apply(...) to
> get an
> > > .collect method:
> > > >
> > > > From:
> > > >
> > >
> >
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
> > > >
> > > > > // applying an AllWindowFunction on non-keyed window
> > stream
> > > > > allWindowedStream.apply (new
> > > > AllWindowFunction<Tuple2<String,Integer>, Integer,
> > Window>() {
> > > > > public void apply (Window window,
> > > > > Iterable<Tuple2<String, Integer>>
> values,
> > > > > Collector<Integer> out) throws
> Exception {
> > > > > int sum = 0;
> > > > > for (value t: values) {
> > > > > sum += t.f1;
> > > > > }
> > > > > out.collect (new Integer(sum));
> > > > > }
> > > > > });
> > > >
> > > > If you consume all those value via an sink, the sink
> > will run
> > > an the
> > > > cluster. You can use .writeToSocket(...) as sink:
> > > >
> > >
> >
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 01/18/2016 06:30 PM, Saiph Kappa wrote:
> > > > > Hi,
> > > > >
> > > > > After performing a windowAll() on a
> DataStream[String], is
> > > there any
> > > > > method to collect and return an array with all
> Strings
> > > within a window
> > > > > (similar to .collect in Spark).
> > > > >
> > > > > I basically want to ship all strings in a window
> to a
> > remote
> > > server
> > > > > through a socket, and want to use the same socket
> > connection
> > > for all
> > > > > strings that I send. The method .addSink
> iterates over all
> > > > records, but
> > > > > does the provided function runs on the flink
> client or on
> > > the server?
> > > > >
> > > > > Thanks.
> > > >
> > > >
> > >
> > >
> >
> >
>
>
signature.asc
Description: OpenPGP digital signature
