I would try to do the outlier compuation with the DataSet API instead of
fetching the results to the client with collect().
If you do that, you can directly use writeAsCsv because the result is still
a DataSet.

What you have to do, is to translate your findOutliers method into DataSet
API code.

Best, Fabian

2016-02-10 18:29 GMT+01:00 subash basnet <yasub...@gmail.com>:

> Hello Fabian,
>
> As written before code:
>
>
>
> *DataSet<Tuple3> fElements =
> env.fromCollection(findOutliers(clusteredPoints,
> finalCentroids));fElements.writeAsCsv(outputPath, "\n", "
> ");env.execute("KMeans Example");*
> I am very new to flink so not so clear about what you suggested, by
> option(1) you meant that I write my own FileWriter here rather than using
> *writeAsCsv()* method. And option(2) I couldn't understand where to
> compute the outlier. I would want to use the *writeAsCsv() *method but
> currently it doesn't perform the write operation and unable to understand
> why.
>
> An interesting thing I found is, when I run the *outlierDetection* class
> from eclipse a single file *result* gets written within the kmeans
> folder, whereas in case of default *KMeans* class it writes a result
> folder within the kmeans folder and the files with points are written
> inside the result folder.
> I give the necessary path in the arguments while running.
> Eg: file:///home/softwares/flink-0.10.0/kmeans/points
> file:///home/softwares/flink-0.10.0/kmeans/centers
> file:///home/softwares/flink-0.10.0/kmeans/result 10
>
> Now, after I create the runnable jar file for KMeans and outlierDetection
> class,  when I upload it to *flink web submission client *it works fine
> for *KMeans.jar*, the folder and files get created. But incase of
> *outlierDetection.jar* no file or folder get's written inside kmeans.
>
> How is it that outlier class is able to write file via eclipse but outlier
> jar not able to write via flink web submission client.
>
>
> Best Regards,
> Subash Basnet
>
> On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Subash,
>>
>> I would not fetch the data to the client, do the computation there, and
>> send it back, just for the purpose of writing it to a file.
>>
>> Either 1) pull the results to the client and write the file from there or
>> 2) compute the outliers in the cluster.
>> I did not study your code completely, but the two nested loops and the
>> condition are a join for example.
>>
>> I would go for option 2, if possible.
>>
>> Best, Fabian
>>
>>
>> 2016-02-10 13:07 GMT+01:00 subash basnet <yasub...@gmail.com>:
>>
>>> Hello Fabian,
>>>
>>> I use the collect() method to get the elements locally and perform
>>> operations on that and return the result as a collection. The collection
>>> result is converted to the DataSet in the calling method.
>>> Below is the code of *findOutliers *method:
>>>
>>> public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>>
>>> clusteredPoints,
>>> DataSet<Centroid> centroids) throws Exception {
>>> List<Tuple3> finalElements = new ArrayList<Tuple3>();
>>> *List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();*
>>> * List<Centroid> centroidList = centroids.collect();*
>>> List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>
>>> elementsWithDistance = new ArrayList<Tuple3<Centroid,
>>> Tuple2<Integer, Point>, Double>>();
>>> for (Centroid centroid : centroidList) {
>>> elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer,
>>> Point>, Double>>();
>>> double totalDistance = 0;
>>> int elementsCount = 0;
>>> for (Tuple2<Integer, Point> e : elements) {
>>> // compute distance
>>> if (e.f0 == centroid.id) {
>>> Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new
>>> Tuple3<Centroid,
>>>     Tuple2<Integer, Point>, Double>();
>>> double distance = e.f1.euclideanDistance(centroid);
>>> totalDistance += distance;
>>> newElement.setFields(centroid, e, distance);
>>> elementsWithDistance.add(newElement);
>>> elementsCount++;
>>> }
>>> }
>>> // finding mean
>>> double mean = totalDistance / elementsCount;
>>> double sdTotalDistanceSquare = 0;
>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double>
>>> elementWithDistance : elementsWithDistance) {
>>> double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
>>> sdTotalDistanceSquare += distanceSquare;
>>> }
>>> double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
>>> double upperlimit = mean + 2 * sd;
>>> double lowerlimit = mean - 2 * sd;
>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point,
>>> Boolean>();// true
>>> // =
>>> // outlier
>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double>
>>> elementWithDistance : elementsWithDistance) {
>>> newElement = new Tuple3<Integer, Point, Boolean>();
>>> if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 >
>>> upperlimit) {
>>> // set as outlier
>>> newElement.setFields(elementWithDistance.f1.f0,
>>> elementWithDistance.f1.f1, true);
>>> } else {
>>> newElement.setFields(elementWithDistance.f1.f0,
>>> elementWithDistance.f1.f1, false);
>>> }
>>> finalElements.add(newElement);
>>> }
>>> }
>>> return finalElements;
>>> }
>>>
>>> I have attached herewith the screenshot of my project structure and
>>> KMeansOutlierDetection.java file for more clarity.
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>>> eligible for Automatic Cleanup! (fhue...@gmail.com) Add cleanup rule
>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D8hdIOJf0i4083WeIB%252BQUXfS8djluXs0JekXPLuRpitIZdx1%252FAH%252BEFK2XXNXEBi6cnglpq9HBimim9%252FKCQ7UDLHnqGh6CGYAGY7zzpc82QTIAjEQM22%252FkBmdko8aAcxcD2P3ax587Jik%253D%26key%3DNO%252B%252BpxOTI6yOrzMtJK8863zNLUnk0hGhdxHIyLoWxck%253D&tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>> | More info
>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>
>>>> Hi Subash,
>>>>
>>>> how is findOutliers implemented?
>>>>
>>>> It might be that you mix-up local and cluster computation. All DataSets
>>>> are processed in the cluster. Please note the following:
>>>> - ExecutionEnvironment.fromCollection() transforms a client local
>>>> connection into a DataSet by serializing it and sending it to the cluster.
>>>> - DataSet.collect() transforms a DataSet into a collection and ships it
>>>> back to the client.
>>>>
>>>> So, does findOutliers operate on the cluster or on the local client,
>>>> i.e., does it work with DataSet and send the result back as a collection or
>>>> does it first collect the results as collection and operate on these?
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-02-10 12:13 GMT+01:00 subash basnet <yasub...@gmail.com>:
>>>>
>>>>> Hello Stefano,
>>>>>
>>>>> Yeah the type casting worked, thank you. But not able to print the
>>>>> Dataset to the file.
>>>>>
>>>>> The default below code which writes the KMeans points along with their
>>>>> centroid numbers to the file works fine:
>>>>>                 // feed new centroids back into next iteration
>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points
>>>>> // assign points to final clusters
>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids,
>>>>> "centroids");
>>>>>               if (fileOutput) {
>>>>> clusteredPoints.writeAsCsv(outputPath, "\n", " ");
>>>>> // since file sinks are lazy, we trigger the execution explicitly
>>>>> env.execute("KMeans Example");
>>>>> }
>>>>>
>>>>> But my modified code below to find outlier:
>>>>> // feed new centroids back into next iteration
>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points
>>>>> // assign points to final clusters
>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids,
>>>>> "centroids");
>>>>>                *DataSet<Tuple3> fElements =
>>>>> env.fromCollection(findOutliers(clusteredPoints, finalCentroids));*
>>>>>                if (fileOutput) {
>>>>> *fElements.writeAsCsv(outputPath, "\n", " ");*
>>>>> // since file sinks are lazy, we trigger the execution explicitly
>>>>> env.execute("KMeans Example");
>>>>> }
>>>>>
>>>>> It's not writing to the file, the *result *folder does not get
>>>>> created inside kmeans folder where my centers, points file are located. I
>>>>> am only able to print it to the console via *fElements.print();*
>>>>>
>>>>> Does it have something to do with *env.exectue("")*, which must be
>>>>> set somewhere in the previous case but not in my case.
>>>>>
>>>>>
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>>
>>>>> On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <
>>>>> stefano.bagh...@radicalbit.io> wrote:
>>>>>
>>>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>>>>> eligible for Automatic Cleanup! (stefano.bagh...@radicalbit.io) Add
>>>>>> cleanup rule
>>>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D1lghJuQA8DeL%252BeQeu%252BXtjFS3Ln6XzfMngdzEeoXhxNL9D%252Fev2KZxlYVTG7zXzAOKqyTfuHHhyjFCeIEJhsuw2xSonJ%252Fz9ELZJGQHf2k5wgw88cHdkws1iTkY3LXpay0T6G30GCRRcKpUcUeyr6wyDBlPBPj1idLV%26key%3DzNLvLvtrNviObgkHecr87NQBUPN5j9wZMWIyBsSzzNM%253D&tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>>> | More info
>>>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>>>
>>>>>> Assuming your EnvironmentContext is named `env` Simply call:
>>>>>>
>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.
>>>>>> *fromCollection*(finalElements);
>>>>>>
>>>>>> Does this help?
>>>>>>
>>>>>> On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <yasub...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I have performed a modification in KMeans code to detect outliers. I
>>>>>>> have printed the output in the console but I am not able to write it to 
>>>>>>> the
>>>>>>> file using the given 'writeAsCsv' method.
>>>>>>> The problem is I generate a list of tuples.
>>>>>>> My List is:
>>>>>>> List<Tuple3> finalElements = new ArrayList<Tuple3>();
>>>>>>> Following is the datatype of the elements added to the list:
>>>>>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer,
>>>>>>> Point, Boolean>();
>>>>>>> finalElements.add(newElement);
>>>>>>> Now I am stuck on how to convert this 'finalElements' to
>>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements,
>>>>>>> so that I could use
>>>>>>> fElements.writeAsCsv(outputPath, "\n"," ");
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Subash Basnet
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> BR,
>>>>>> Stefano Baghino
>>>>>>
>>>>>> Software Engineer @ Radicalbit
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to