Does the data actually fit in memory? Check the web ui. If it doesn't caching is not going to help you.
On Tue, Apr 12, 2016 at 9:00 AM, Imran Akbar <skunkw...@gmail.com> wrote: > thanks Michael, > > That worked. > But what's puzzling is if I take the exact same code and run it off a temp > table created from parquet, vs. a cached table - it runs much slower. 5-10 > seconds uncached vs. 47-60 seconds cached. > > Any ideas why? > > Here's my code snippet: > df = data.select("customer_id", struct('dt', 'product').alias("vs"))\ > .groupBy("customer_id")\ > .agg(min("vs").alias("final"))\ > .select("customer_id", "final.dt", "final.product") > df.head() > > My log from the non-cached run: > http://pastebin.com/F88sSv1B > > Log from the cached run: > http://pastebin.com/Pmmfea3d > > thanks, > imran > > On Fri, Apr 8, 2016 at 12:33 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> You need to use the struct function >> <https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.functions.struct> >> (which creates an actual struct), you are trying to use the struct datatype >> (which just represents the schema of a struct). >> >> On Thu, Apr 7, 2016 at 3:48 PM, Imran Akbar <skunkw...@gmail.com> wrote: >> >>> thanks Michael, >>> >>> >>> I'm trying to implement the code in pyspark like so (where my dataframe >>> has 3 columns - customer_id, dt, and product): >>> >>> st = StructType().add("dt", DateType(), True).add("product", >>> StringType(), True) >>> >>> top = data.select("customer_id", st.alias('vs')) >>> .groupBy("customer_id") >>> .agg(max("dt").alias("vs")) >>> .select("customer_id", "vs.dt", "vs.product") >>> >>> But I get an error saying: >>> >>> AttributeError: 'StructType' object has no attribute 'alias' >>> >>> Can I do this without aliasing the struct? Or am I doing something >>> incorrectly? >>> >>> >>> regards, >>> >>> imran >>> >>> On Wed, Apr 6, 2016 at 4:16 PM, Michael Armbrust <mich...@databricks.com >>> > wrote: >>> >>>> Ordering for a struct goes in order of the fields. So the max struct >>>>> is the one with the highest TotalValue (and then the highest category >>>>> if there are multiple entries with the same hour and total value). >>>>> >>>>> Is this due to "InterpretedOrdering" in StructType? >>>>> >>>> >>>> That is one implementation, but the code generated ordering also >>>> follows the same contract. >>>> >>>> >>>> >>>>> 4) Is it faster doing it this way than doing a join or window >>>>> function in Spark SQL? >>>>> >>>>> Way faster. This is a very efficient way to calculate argmax. >>>>> >>>>> Can you explain how this is way faster than window function? I can >>>>> understand join doesn't make sense in this case. But to calculate the >>>>> grouping max, you just have to shuffle the data by grouping keys. You >>>>> maybe >>>>> can do a combiner on the mapper side before shuffling, but that is it. Do >>>>> you mean windowing function in Spark SQL won't do any map side combiner, >>>>> even it is for max? >>>>> >>>> >>>> Windowing can't do partial aggregation and will have to collect all the >>>> data for a group so that it can be sorted before applying the function. In >>>> contrast a max aggregation will do partial aggregation (map side combining) >>>> and can be calculated in a streaming fashion. >>>> >>>> Also, aggregation is more common and thus has seen more optimization >>>> beyond the theoretical limits described above. >>>> >>>> >>> >> >