Hi Subash, I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file.
Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster. I did not study your code completely, but the two nested loops and the condition are a join for example. I would go for option 2, if possible. Best, Fabian 2016-02-10 13:07 GMT+01:00 subash basnet <[email protected]>: > Hello Fabian, > > I use the collect() method to get the elements locally and perform > operations on that and return the result as a collection. The collection > result is converted to the DataSet in the calling method. > Below is the code of *findOutliers *method: > > public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> > clusteredPoints, > DataSet<Centroid> centroids) throws Exception { > List<Tuple3> finalElements = new ArrayList<Tuple3>(); > *List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();* > * List<Centroid> centroidList = centroids.collect();* > List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> > elementsWithDistance = new ArrayList<Tuple3<Centroid, > Tuple2<Integer, Point>, Double>>(); > for (Centroid centroid : centroidList) { > elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, > Point>, Double>>(); > double totalDistance = 0; > int elementsCount = 0; > for (Tuple2<Integer, Point> e : elements) { > // compute distance > if (e.f0 == centroid.id) { > Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new > Tuple3<Centroid, > Tuple2<Integer, Point>, Double>(); > double distance = e.f1.euclideanDistance(centroid); > totalDistance += distance; > newElement.setFields(centroid, e, distance); > elementsWithDistance.add(newElement); > elementsCount++; > } > } > // finding mean > double mean = totalDistance / elementsCount; > double sdTotalDistanceSquare = 0; > for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance > : elementsWithDistance) { > double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2); > sdTotalDistanceSquare += distanceSquare; > } > double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount); > double upperlimit = mean + 2 * sd; > double lowerlimit = mean - 2 * sd; > Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, > Boolean>();// true > // = > // outlier > for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance > : elementsWithDistance) { > newElement = new Tuple3<Integer, Point, Boolean>(); > if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > > upperlimit) { > // set as outlier > newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, > true); > } else { > newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, > false); > } > finalElements.add(newElement); > } > } > return finalElements; > } > > I have attached herewith the screenshot of my project structure and > KMeansOutlierDetection.java file for more clarity. > > > Best Regards, > Subash Basnet > > On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[email protected]> wrote: > >> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible >> for Automatic Cleanup! ([email protected]) Add cleanup rule >> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D8hdIOJf0i4083WeIB%252BQUXfS8djluXs0JekXPLuRpitIZdx1%252FAH%252BEFK2XXNXEBi6cnglpq9HBimim9%252FKCQ7UDLHnqGh6CGYAGY7zzpc82QTIAjEQM22%252FkBmdko8aAcxcD2P3ax587Jik%253D%26key%3DNO%252B%252BpxOTI6yOrzMtJK8863zNLUnk0hGhdxHIyLoWxck%253D&tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> | More info >> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> >> Hi Subash, >> >> how is findOutliers implemented? >> >> It might be that you mix-up local and cluster computation. All DataSets >> are processed in the cluster. Please note the following: >> - ExecutionEnvironment.fromCollection() transforms a client local >> connection into a DataSet by serializing it and sending it to the cluster. >> - DataSet.collect() transforms a DataSet into a collection and ships it >> back to the client. >> >> So, does findOutliers operate on the cluster or on the local client, >> i.e., does it work with DataSet and send the result back as a collection or >> does it first collect the results as collection and operate on these? >> >> Best, Fabian >> >> 2016-02-10 12:13 GMT+01:00 subash basnet <[email protected]>: >> >>> Hello Stefano, >>> >>> Yeah the type casting worked, thank you. But not able to print the >>> Dataset to the file. >>> >>> The default below code which writes the KMeans points along with their >>> centroid numbers to the file works fine: >>> // feed new centroids back into next iteration >>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); >>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points >>> // assign points to final clusters >>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, >>> "centroids"); >>> if (fileOutput) { >>> clusteredPoints.writeAsCsv(outputPath, "\n", " "); >>> // since file sinks are lazy, we trigger the execution explicitly >>> env.execute("KMeans Example"); >>> } >>> >>> But my modified code below to find outlier: >>> // feed new centroids back into next iteration >>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); >>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points >>> // assign points to final clusters >>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, >>> "centroids"); >>> *DataSet<Tuple3> fElements = >>> env.fromCollection(findOutliers(clusteredPoints, finalCentroids));* >>> if (fileOutput) { >>> *fElements.writeAsCsv(outputPath, "\n", " ");* >>> // since file sinks are lazy, we trigger the execution explicitly >>> env.execute("KMeans Example"); >>> } >>> >>> It's not writing to the file, the *result *folder does not get created >>> inside kmeans folder where my centers, points file are located. I am only >>> able to print it to the console via *fElements.print();* >>> >>> Does it have something to do with *env.exectue("")*, which must be set >>> somewhere in the previous case but not in my case. >>> >>> >>> >>> Best Regards, >>> Subash Basnet >>> >>> >>> On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino < >>> [email protected]> wrote: >>> >>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>>> eligible for Automatic Cleanup! ([email protected]) Add >>>> cleanup rule >>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D1lghJuQA8DeL%252BeQeu%252BXtjFS3Ln6XzfMngdzEeoXhxNL9D%252Fev2KZxlYVTG7zXzAOKqyTfuHHhyjFCeIEJhsuw2xSonJ%252Fz9ELZJGQHf2k5wgw88cHdkws1iTkY3LXpay0T6G30GCRRcKpUcUeyr6wyDBlPBPj1idLV%26key%3DzNLvLvtrNviObgkHecr87NQBUPN5j9wZMWIyBsSzzNM%253D&tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>> | More info >>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>> >>>> Assuming your EnvironmentContext is named `env` Simply call: >>>> >>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements = env. >>>> *fromCollection*(finalElements); >>>> >>>> Does this help? >>>> >>>> On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[email protected]> >>>> wrote: >>>> >>>>> Hello all, >>>>> >>>>> I have performed a modification in KMeans code to detect outliers. I >>>>> have printed the output in the console but I am not able to write it to >>>>> the >>>>> file using the given 'writeAsCsv' method. >>>>> The problem is I generate a list of tuples. >>>>> My List is: >>>>> List<Tuple3> finalElements = new ArrayList<Tuple3>(); >>>>> Following is the datatype of the elements added to the list: >>>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, >>>>> Point, Boolean>(); >>>>> finalElements.add(newElement); >>>>> Now I am stuck on how to convert this 'finalElements' to >>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements, >>>>> so that I could use >>>>> fElements.writeAsCsv(outputPath, "\n"," "); >>>>> >>>>> Best Regards, >>>>> Subash Basnet >>>>> >>>> >>>> >>>> >>>> -- >>>> BR, >>>> Stefano Baghino >>>> >>>> Software Engineer @ Radicalbit >>>> >>>> >>> >> >> >
