@Justin, it's fixed by https://github.com/apache/spark/pull/12057
On Thu, Feb 11, 2016 at 11:26 AM, Davies Liu <dav...@databricks.com> wrote: > Had a quick look in your commit, I think that make sense, could you > send a PR for that, then we can review it. > > In order to support 2), we need to change the serialized Python > function from `f(iter)` to `f(x)`, process one row at a time (not a > partition), > then we can easily combine them together: > > for f1(f2(x)) and g1(g2(x)), we can do this in Python: > > for row in reading_stream: > x1, x2 = row > y1 = f1(f2(x1)) > y2 = g1(g2(x2)) > yield (y1, y2) > > For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`. > > On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang <justin.u...@gmail.com> wrote: >> Hey guys, >> >> BLUF: sorry for the length of this email, trying to figure out how to batch >> Python UDF executions, and since this is my first time messing with >> catalyst, would like any feedback >> >> My team is starting to use PySpark UDFs quite heavily, and performance is a >> huge blocker. The extra roundtrip serialization from Java to Python is not a >> huge concern if we only incur it ~once per column for most workflows, since >> it'll be in the same order of magnitude as reading files from disk. However, >> right now each Python UDFs lead to a single roundtrip. There is definitely a >> lot we can do regarding this: >> >> (all the prototyping code is here: >> https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc) >> >> 1. We can't chain Python UDFs. >> >> df.select(python_times_2(python_times_2("col1"))) >> >> throws an exception saying that the inner expression isn't evaluable. The >> workaround is to do >> >> >> df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp")) >> >> This can be solved in ExtractPythonUDFs by always extracting the inner most >> Python UDF first. >> >> // Pick the UDF we are going to evaluate (TODO: Support evaluating >> multiple UDFs at a time) >> // If there is more than one, we will add another evaluation >> operator in a subsequent pass. >> - udfs.find(_.resolved) match { >> + udfs.find { udf => >> + udf.resolved && udf.children.map { child: Expression => >> + child.find { // really hacky way to find if a child of a udf >> has the PythonUDF node >> + case p: PythonUDF => true >> + case _ => false >> + }.isEmpty >> + }.reduce((x, y) => x && y) >> + } match { >> case Some(udf) => >> var evaluation: EvaluatePython = null >> >> 2. If we have a Python UDF applied to many different columns, where they >> don’t depend on each other, we can optimize them by collapsing them down >> into a single python worker. Although we have to serialize and send the same >> amount of data to the python interpreter, in the case where I am applying >> the same function to 20 columns, the overhead/context_switches of having 20 >> interpreters run at the same time causes huge performance hits. I have >> confirmed this by manually taking the 20 columns, converting them to a >> struct, and then writing a UDF that processes the struct at the same time, >> and the speed difference is 2x. My approach to adding this to catalyst is >> basically to write an optimizer rule called CombinePython which joins >> adjacent EvaluatePython nodes that don’t depend on each other’s variables, >> and then having BatchPythonEvaluation run multiple lambdas at once. I would >> also like to be able to handle the case >> df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”), >> python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a >> PushDownPythonEvaluation optimizer that will push the optimization through a >> select/project, so that the CombinePython rule can join the two. >> >> 3. I would like CombinePython to be able to handle UDFs that chain off of >> each other. >> >> df.select(python_times_2(python_times_2(“col1”))) >> >> I haven’t prototyped this yet, since it’s a lot more complex. The way I’m >> thinking about this is to still have a rule called CombinePython, except >> that the BatchPythonEvaluation will need to be smart enough to build up the >> dag of dependencies, and then feed that information to the python >> interpreter, so it can compute things in the right order, and reuse the >> in-memory objects that it has already computed. Does this seem right? Should >> the code mainly be in BatchPythonEvaluation? In addition, we will need to >> change up the protocol between the java and python sides to support sending >> this information. What is acceptable? >> >> Any help would be much appreciated! Especially w.r.t where to the design >> choices such that the PR that has a chance of being accepted. >> >> Justin --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org