The Spark community has been holding a weekly sync meeting on DataSourceV2 and sharing notes back to their dev list https://lists.apache.org/list.html?d...@spark.apache.org:lte=3M:DataSourceV2%20sync. At this time, there are still some moving pieces at Spark’s side. Is it too early to target DataSourceV2 ?
Thanks, Manu Zhang On Dec 19, 2018, 6:40 PM +0800, Etienne Chauchot <echauc...@apache.org>, wrote: > Thanks Kenn for taking the time to take a look > > Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit : > > I don't know DataSourceV2 well, but I am reading around to try to help. I > > see the problem with the SparkSession API. Is there no other way to > > instantiate a DataSourceV2 and read the data from it? > => No this is exactly what I'm looking for :) > > > > Other thoughts: > > > > - Maybe start from Splittable DoFn since it is a new translator? > => Yes but I still need to translate BoundedSource and UnBoundedSource for > compatibility with IOs that have not migrated to SDF > > > - I wonder if the reason for this API is that the class name and options > > are what is shipped to workers, so the limited API makes serialization easy > > for them? > => Yes, that and because DataSource is the entry point of the spark pipeline > so it should not need to receive more than user input conf, hence the String > only support. But we are not users but DAG translators hence our need to pass > more complex objects than Strings. > > > - As a total hack, you could serialize the Beam objects (maybe to portable > > protos) and pass that as a single "primitive type" option. > => Yes, sure, it could work. Another hack would be to use ASM or ByteBuddy to > "enhance" Spark classes but it is weak and risky :) > > > > You definitely need someone from Spark more than someone from Beam for this > > issue. At this point, I've read the scaladocs enough that I think I'd dig > > into Spark's code to see what is going on and if there is a way that is > > more obviously right. > => Yes this is what I tried but got no answer on the public spark MLs. > Luckily I asked directly Ryan Blue of the Spark community. He kindly > answered. I'm digging into Catalog and Spark plans to get a different > instanciation mechanism. > > Etienne > > > > Kenn > > > > > On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot <echauc...@apache.org> > > > wrote: > > > > Hi everyone, > > > > > > > > Does anyone have comments on this question? > > > > > > > > Thanks > > > > Etienne > > > > > > > > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a écrit : > > > > > Hi guys, > > > > > I'm currently coding a POC on a new spark runner based on structured > > > > > streaming and new DataSourceV2 API and I'm having an interrogation. > > > > > Having found no pointers on the internet, I've asked the spark > > > > > community with no luck. If anyone of you have knowledge about new > > > > > Spark DataSourceV2 API, can you share thoughts? > > > > > > > > > > Also I did not mention in the email but I did not find any way to get > > > > > a reference on the automatically created DataSourceV2 instance, so I > > > > > cannot lazy init the source either. > > > > > > > > > > Thanks > > > > > > > > > > Etienne > > > > > > > > > > -------- Message transféré -------- > > > > > De: Etienne Chauchot <echauc...@apache.org> > > > > > À: d...@spark.apache.org > > > > > Objet: [Apache Beam] Custom DataSourceV2 instanciation: parameters > > > > > passing and Encoders > > > > > Date: Tue, 11 Dec 2018 19:02:23 +0100 > > > > > > > > > > Hi Spark guys, > > > > > > > > > > I'm Etienne Chauchot and I'm a committer on the Apache Beam project. > > > > > > > > > > We have what we call runners. They are pieces of software that > > > > > translate pipelines written using Beam API into pipelines that use > > > > > native execution engine API. Currently, the Spark runner uses old RDD > > > > > / DStream APIs. > > > > > I'm writing a new runner that will use structured streaming (but not > > > > > continuous processing, and also no schema for now). > > > > > > > > > > I am just starting. I'm currently trying to map our sources to yours. > > > > > I'm targeting new DataSourceV2 API. It maps pretty well with Beam > > > > > sources but I have a problem with instanciation of the custom source. > > > > > I searched for an answer in stack-overflow and user ML with no luck. > > > > > I guess it is a too specific question: > > > > > > > > > > When visiting Beam DAG I have access to Beam objects such as Source > > > > > and Reader that I need to map to MicroBatchReader and > > > > > InputPartitionReader. > > > > > As far as I understand, a custom DataSourceV2 is instantiated > > > > > automatically by spark thanks to > > > > > sparkSession.readStream().format(providerClassName) or similar code. > > > > > The problem is that I can only pass options of primitive types + > > > > > String so I cannot pass the Beam Source to DataSourceV2. > > > > > => Is there a way to do so ? > > > > > > > > > > > > > > > Also I get as an output a Dataset<Row>. The Row contains an instance > > > > > of Beam WindowedValue<T>, T is the type parameter of the Source. I do > > > > > a map on the Dataset to transform it to a Dataset<WindowedValue<T>>. > > > > > I have a question related to the Encoder: > > > > > => how to properly create an Encoder for the generic type > > > > > WindowedValue<T> to use in the map? > > > > > > > > > > Here is the code: > > > > > https://github.com/apache/beam/tree/spark-runner_structured-streaming > > > > > > > > > > And more specially: > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java > > > > > > > > > > Thanks, > > > > > > > > > > Etienne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >