Hello all,

I have been trying to read the stock data as a stream and perform outlier
detection upon it.
My problem is mainly due to the absence of 'withBroadcastSet()' in
DataStream API I used global variable and DataStreamUtils to read the
variable *loop*.
But I get cast exceptions and others. Could you please suggest if there is
any other way to do, the operation as explained in code without using *loop
* as a global variable.
Below is snapshot of my code, and following the exception messages.

*private static ConnectedIterativeStreams<Centroid, Centroid> loop;*
.........
main(){
DataStream<Point> points = newDataStream.map(new getPoints());
DataStream<Centroid> centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
*loop = centroids.iterate(10).withFeedbackType(Centroid.class);*
*DataStream<Tuple2<String, Point>> newCentroids = points.map(new
SelectNearestCenter());*
}

@ForwardedFields("*->1")
public static final class SelectNearestCenter extends
RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;

@Override
public void open(Configuration parameters) throws Exception {
*Iterator<Centroid> iter = DataStreamUtils.collect(loop.getFirstInput());*
* this.centroids = Lists.newArrayList(iter);*
}

@Override
public Tuple2<String, Point> map(Point p) throws Exception {

double minDistance = Double.MAX_VALUE;
String closestCentroidId = "-1";

// check all cluster centers
for (Centroid centroid : centroids) {
// compute distance
double distance = p.euclideanDistance(centroid);

// update nearest cluster if necessary
if (distance < minDistance) {
minDistance = distance;
closestCentroidId = centroid.id;
}
}

// emit a new record with the center id and the data point.
return new Tuple2<String, Point>(closestCentroidId, p);
}
}


Could you please explain the following exception error.

14:23:56,269 *ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
      - Caught exception while processing timer.*
*org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator*
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:322)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:730)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by:* java.lang.RuntimeException:
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
org.apache.flink.streaming.runtime.streamrecord.StreamRecord*
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
at
org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:319)
... 10 more
Caused by: *java.lang.ClassCastException:
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
org.apache.flink.streaming.runtime.streamrecord.StreamRecord*
at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:1)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
... 13 more

And as a result I get the below error message,
14:23:58,310 *ERROR
org.apache.flink.streaming.api.operators.AbstractStreamOperator  -
Exception while closing user function while failing or canceling task*
*java.lang.NullPointerException*
at
org.apache.flink.contrib.streaming.CollectSink.closeConnection(CollectSink.java:100)
at
org.apache.flink.contrib.streaming.CollectSink.close(CollectSink.java:131)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:347)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:294)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)

Sink: Unnamed (1/1) switched to FAILED with exception.
java.lang.RuntimeException:* java.net.ConnectException: Connection refused*
at
org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:73)
at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:123)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:244)
at
org.apache.flink.contrib.streaming.CollectSink.initializeConnection(CollectSink.java:69)


Best Regards,
Subash Basnet

Reply via email to