Hi, all!
I'm a developer who works to support data scientists at CERT. We've been having some great success working with Spark for data analysis, and I have some questions about how we could contribute to work on Spark in support of our goals. Specifically, we have some interest in user-defined types, or their equivalents. When Spark 2 arrived, user-defined types (UDTs) were made private and seem to have fallen by the wayside in favor of using encoders for Datasets. I have some questions about the future of these mechanisms, and was wondering if there's been a plan published anywhere for the future of these mechanisms, or anyone I could talk to about where things are going with them. I've roughly outlined our experience with these two mechanisms below, and our hopes for what might be accomplished in the future. We'd love to spend some effort on development here, but haven't been able to figure out if anyone is already working on improvements in this area, or if there's some plan in place for where things are going to go. So, I'd love to get in touch with anyone who might know more. Background: Much of the work in my group is analysis of Internet protocol data, and I think that IP addresses are a great example how a custom atomic type can be helpful. IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6 addresses) have a natural binary form (a sequence of bytes). Using this format makes the default implementation of certain basic operations sensible (equality and comparison, for example). Defining UDFs for more complicated operations is not terribly difficultt. But this format is not human-friendly to view. The human-readable presentations of IP addresses, on the other hand, are large and unwieldy to work with computationally. There is a canonical textual form for both IPv4 and IPv6 addresses, but converting back and forth between that form and the binary form is expensive, and the text form is generally at least twice as large as the binary form. The text form is suitable for presenting to human beings, but that's about it. There are also a variety of other types of Internet data that are best represented by byte arrays and the like, meaning that simply saying "just use a byte array for this column!" can be unfortunate for both type-safety and comprehensibility of a colleciton of data. When we were working on top of Spark 1, we had begun to look using UDTs to represent IP addresses. There were some issues with working with UDTs and working with the built-in operations like comparisons, but we had some hope for improvements with future Spark releases. With Spark 2.0, the UDT API was made private, and the encoder mechanism was suggested for use instead. For a bit, we experimented with using the API anyway by putting stubs into Spark's namespace, but there weren't really a lot of good places to hook various operations like equality that one would expect to work on an atomic type. We also tried using the encoder APIs, and ran into a few problems there as well. Encoders are well suited to handling "top-level" values, but the most convenient way to work with encoded data is by having your top level be a case class defining types and names for a record type. And here, there's a problem, because encoders from the implicit environment are not available when encoding the fields of a case class. So, if we defined a custom encoder for our IPAddress type, and then included an IPAddress as a field of a record, this would result in an error. One approach we tried to get around that was to make IP addresses themselves into case classes as well, so that only the default encoders would be required. This eliminated the error, but made working with the values a nightmare. If we made a Dataset[IPAddress], the byte array would be presented in a reasonable manner, but a Dataset[Rec] where Rec had IPAddress fields was another story, resulting in the default toString of Java arrays being used: +-------------+-------------+ | a| b| +-------------+-------------+ |[[B@47260109]|[[B@3538740a]| |[[B@617f4814]|[[B@77e69bee]| +-------------+-------------+ (See code snippet at the end of this message for details.) Now basically all interactions would have to go through UDFs, including remembering to format the IPAddress field if you wanted any useful information out of it at all. As a result, since our initial experiments with 2.0 we dropped back and punted to using text for all IP addresses. But, we'd still like to do better. What we optimally want is some mechanism for a user-defined atomic type (whether via encoders or via registering a new type) which allows for: * An appropriately efficient underlying form to be used. (A struct with a byte array field would be fine. A byte array field would be fine.) * A display form that is meaningful to the user (the expected form like "172.217.5.238" and "2607:f8b0:4004:800::200e".) * At least some support for standard SQL operators like equality and comparison, and the ability to define UDFs that work with the type. Longer term, it would be lovely to: * Be able to work with values of the type in an appropriate way in different source languags (i.e. make it not hard to work with the values in Python or R, although the restrictions of those languages will require additional implementation work.) * Be able to provide new Catalyst optimizations specific to the type and functions defined on the type. We'd love to provide some effort at achieving these goals, but aren't sure where to start. We'd like to avoid stepping in the way of any efforts that might already be underway to improve these mechanisms. Thanks very much! Katherine Prevost Carnegie Mellon / Software Engineering Institute / CERT -------------------------------------------------------------------->8-- // Simple example demonstrating the treatment of a case class with a // byte array within another case class. case class IPAddress(bytes: Array[Byte]) { override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))" } val a = IPAddress(Array(1,2,3,4)) val b = IPAddress(Array(5,6,7,8)) val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)) val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32)) val x = Array(a, b, c, d) val xs = sc.parallelize(x).toDS /* scala> xs.show +--------------------+ | bytes| +--------------------+ | [01 02 03 04]| | [05 06 07 08]| |[01 02 03 04 05 0...| |[11 12 13 14 15 1...| +--------------------+ */ case class Rec(a: IPAddress, b: IPAddress) { override def toString: String = s"Rec($a, $b)" } val e = Rec(a, b) val f = Rec(c, d) val y = Array(e, f) val ys = sc.parallelize(y).toDS /* scala> ys.show +-------------+-------------+ | a| b| +-------------+-------------+ |[[B@47260109]|[[B@3538740a]| |[[B@617f4814]|[[B@77e69bee]| +-------------+-------------+ */ -------------------------------------------------------------------->8-- --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org