Hi Gyula,
I tried doing something like the following in the 2 flatmaps, but i am not
getting desired results and still confused how the concept you put forward
would work:
public static final class MyCoFlatmap implements CoFlatMapFunction<Point,
Centroid, Centroid>{
Centroid[] centroids;
@Override
public void flatMap1(Point in, Collector<Centroid> out) throws
Exception {
if(flag)
{
Centroids = new Centroid[numofMC];
flag = false;
}
if(id < numofMC)
{
System.out.println(id);
Centroid generatedMC =
CentroidCreator.generateCentroid(id,timestamp,in);
Centroids[id] = generatedMC;
out.collect(generatedMC);
id++;
}
else
{
Centroid closestMC = null;
double minDistance = Double.MAX_VALUE;
for(Centroid mc : Centroids)
{
double distance = distance(in.pt, mc.getCenter());
if (distance < minDistance) {
closestMC = mc;
minDistance = distance;
}
}
double radius = getRadius(closestMC, Centroids);
if (minDistance < radius)
{
closestMC.insert(in.pt, timestamp);
}
out.collect(closestMC);
}
}
@Override
public void flatMap2(Centroid in, Collector<Centroid> out)
throws
Exception {
Centroids[in.id] = in;
System.out.println("MC: "+in.toString());
}
}
as mentioned in my previous reply, i understand that each of the map
function in the co-flat map would receive one tuple each at a time .. so
that would mean if i have a datastream of centroids, it would arrive one at
a time on the partitions and that would defeat the purpose because i need
all of the centroid to compare the distance to.
I tried storing the centroids in an array of centroid but i again dont
understand how i can push all of the changes back.
a small example or code snippet would really be helpful.
Thanks a lot
Regards
Biplob
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.