Yes, this is thanks to these spark community meetings that I got the name of
Ryan. And, indeed, when I saw the design
sync meetings, I realized how recent the DataSourceV2 API is. I think you are
right, I should wait for it to be finished
and in the meantime use V1.
Etienne
Le mercredi 19 décembre 2018 à 23:27 +0800, Manu Zhang a écrit :
> The Spark community has been holding a weekly sync meeting on DataSourceV2
> and sharing notes back to their dev list
> https://lists.apache.org/list.html?d...@spark.apache.org:lte=3M:DataSourceV2%20sync.
> At this time, there are still some
> moving pieces at Spark’s side. Is it too early to target DataSourceV2 ?
>
>
> Thanks,
>
> Manu Zhang
> On Dec 19, 2018, 6:40 PM +0800, Etienne Chauchot <echauc...@apache.org>,
> wrote:
>
> > Thanks Kenn for taking the time to take a look
> >
> >
> > Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit :
> > > I don't know DataSourceV2 well, but I am reading around to try to help. I
> > > see the problem with the SparkSession
> > > API. Is there no other way to instantiate a DataSourceV2 and read the
> > > data from it?
> >
> > => No this is exactly what I'm looking for :)
> > >
> > >
> > > Other thoughts:
> > >
> > >
> > > - Maybe start from Splittable DoFn since it is a new translator?
> > >
> > >
> >
> > => Yes but I still need to translate BoundedSource and UnBoundedSource for
> > compatibility with IOs that have not
> > migrated to SDF
> >
> >
> > > - I wonder if the reason for this API is that the class name and options
> > > are what is shipped to workers, so the
> > > limited API makes serialization easy for them?
> > >
> >
> > => Yes, that and because DataSource is the entry point of the spark
> > pipeline so it should not need to receive more
> > than user input conf, hence the String only support. But we are not users
> > but DAG translators hence our need to pass
> > more complex objects than Strings.
> >
> >
> > >
> > > - As a total hack, you could serialize the Beam objects (maybe to
> > > portable protos) and pass that as a single
> > > "primitive type" option.
> > >
> > >
> >
> > => Yes, sure, it could work. Another hack would be to use ASM or ByteBuddy
> > to "enhance" Spark classes but it is weak
> > and risky :)
> > >
> > >
> > >
> > > You definitely need someone from Spark more than someone from Beam for
> > > this issue. At this point, I've read the
> > > scaladocs enough that I think I'd dig into Spark's code to see what is
> > > going on and if there is a way that is more
> > > obviously right.
> > >
> > >
> >
> > => Yes this is what I tried but got no answer on the public spark MLs.
> > Luckily I asked directly Ryan Blue of the
> > Spark community. He kindly answered. I'm digging into Catalog and Spark
> > plans to get a different instanciation
> > mechanism.
> >
> >
> > Etienne
> > >
> > >
> > >
> > >
> > > Kenn
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot <echauc...@apache.org>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > >
> > > > Does anyone have comments on this question?
> > > >
> > > >
> > > > Thanks
> > > > Etienne
> > > >
> > > >
> > > > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a écrit :
> > > > > Hi guys,
> > > > > I'm currently coding a POC on a new spark runner based on structured
> > > > > streaming and new DataSourceV2 API and
> > > > > I'm having an interrogation. Having found no pointers on the
> > > > > internet, I've asked the spark community with no
> > > > > luck. If anyone of you have knowledge about new Spark DataSourceV2
> > > > > API, can you share thoughts?
> > > > >
> > > > >
> > > > > Also I did not mention in the email but I did not find any way to get
> > > > > a reference on the automatically created
> > > > > DataSourceV2 instance, so I cannot lazy init the source either.
> > > > >
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > > Etienne
> > > > >
> > > > >
> > > > > -------- Message transféré --------
> > > > > De: Etienne Chauchot <echauc...@apache.org>
> > > > > À: d...@spark.apache.org
> > > > > Objet: [Apache Beam] Custom DataSourceV2 instanciation: parameters
> > > > > passing and Encoders
> > > > > Date: Tue, 11 Dec 2018 19:02:23 +0100
> > > > >
> > > > >
> > > > > Hi Spark guys,
> > > > >
> > > > >
> > > > > I'm Etienne Chauchot and I'm a committer on the Apache Beam project.
> > > > >
> > > > >
> > > > > We have what we call runners. They are pieces of software that
> > > > > translate pipelines written using Beam API into
> > > > > pipelines that use native execution engine API. Currently, the Spark
> > > > > runner uses old RDD / DStream APIs.
> > > > > I'm writing a new runner that will use structured streaming (but not
> > > > > continuous processing, and also no schema
> > > > > for now).
> > > > >
> > > > >
> > > > > I am just starting. I'm currently trying to map our sources to yours.
> > > > > I'm targeting new DataSourceV2 API. It
> > > > > maps pretty well with Beam sources but I have a problem with
> > > > > instanciation of the custom source.
> > > > > I searched for an answer in stack-overflow and user ML with no luck.
> > > > > I guess it is a too specific question:
> > > > >
> > > > >
> > > > > When visiting Beam DAG I have access to Beam objects such as Source
> > > > > and Reader that I need to map to
> > > > > MicroBatchReader and InputPartitionReader.
> > > > > As far as I understand, a custom DataSourceV2 is instantiated
> > > > > automatically by spark thanks to
> > > > > sparkSession.readStream().format(providerClassName) or similar code.
> > > > > The problem is that I can only pass
> > > > > options of primitive types + String so I cannot pass the Beam Source
> > > > > to DataSourceV2.
> > > > > => Is there a way to do so ?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Also I get as an output a Dataset<Row>. The Row contains an instance
> > > > > of Beam WindowedValue<T>, T is the type
> > > > > parameter of the Source. I do a map on the Dataset to transform it to
> > > > > a Dataset<WindowedValue<T>>. I have a
> > > > > question related to the Encoder:
> > > > > => how to properly create an Encoder for the generic type
> > > > > WindowedValue<T> to use in the map?
> > > > >
> > > > >
> > > > > Here is the code:
> > > > > https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > > > >
> > > > >
> > > > > And more specially:
> > > > >
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
> > > > >
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > > Etienne
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
>
>
>