Thanks Kenn for taking the time to take a look
Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit :
> I don't know DataSourceV2 well, but I am reading around to try to help. I see
> the problem with the SparkSession API.
> Is there no other way to instantiate a DataSourceV2 and read the data from it?
=> No this is exactly what I'm looking for :)
> Other thoughts:
>
> - Maybe start from Splittable DoFn since it is a new translator?
=> Yes but I still need to translate BoundedSource and UnBoundedSource for
compatibility with IOs that have not migrated
to SDF
> - I wonder if the reason for this API is that the class name and options are
> what is shipped to workers, so the
> limited API makes serialization easy for them?
=> Yes, that and because DataSource is the entry point of the spark pipeline so
it should not need to receive more than
user input conf, hence the String only support. But we are not users but DAG
translators hence our need to pass more
complex objects than Strings.
> - As a total hack, you could serialize the Beam objects (maybe to portable
> protos) and pass that as a single
> "primitive type" option.
=> Yes, sure, it could work. Another hack would be to use ASM or ByteBuddy to
"enhance" Spark classes but it is weak and
risky :)
> You definitely need someone from Spark more than someone from Beam for this
> issue. At this point, I've read the
> scaladocs enough that I think I'd dig into Spark's code to see what is going
> on and if there is a way that is more
> obviously right.
=> Yes this is what I tried but got no answer on the public spark MLs. Luckily
I asked directly Ryan Blue of the Spark
community. He kindly answered. I'm digging into Catalog and Spark plans to get
a different instanciation mechanism.
Etienne
> Kenn
> On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot <echauc...@apache.org>
> wrote:
> > Hi everyone,
> > Does anyone have comments on this question?
> > ThanksEtienne
> > Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a écrit :
> > > Hi guys,I'm currently coding a POC on a new spark runner based on
> > > structured streaming and new DataSourceV2 API
> > > and I'm having an interrogation. Having found no pointers on the
> > > internet, I've asked the spark community with no
> > > luck. If anyone of you have knowledge about new Spark DataSourceV2 API,
> > > can you share thoughts?
> > > Also I did not mention in the email but I did not find any way to get a
> > > reference on the automatically created
> > > DataSourceV2 instance, so I cannot lazy init the source either.
> > > Thanks
> > > Etienne
> > > -------- Message transféré --------De: Etienne Chauchot
> > > <echauc...@apache.org>À: dev@spark.apache.orgObjet:
> > > [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and
> > > EncodersDate: Tue, 11 Dec 2018 19:02:23
> > > +0100
> > > Hi Spark guys,
> > > I'm Etienne Chauchot and I'm a committer on the Apache Beam project.
> > > We have what we call runners. They are pieces of software that translate
> > > pipelines written using Beam API into
> > > pipelines that use native execution engine API. Currently, the Spark
> > > runner uses old RDD / DStream APIs. I'm
> > > writing a new runner that will use structured streaming (but not
> > > continuous processing, and also no schema for
> > > now).
> > > I am just starting. I'm currently trying to map our sources to yours. I'm
> > > targeting new DataSourceV2 API. It maps
> > > pretty well with Beam sources but I have a problem with instanciation of
> > > the custom source.I searched for an
> > > answer in stack-overflow and user ML with no luck. I guess it is a too
> > > specific question:
> > > When visiting Beam DAG I have access to Beam objects such as Source and
> > > Reader that I need to map to
> > > MicroBatchReader and InputPartitionReader.As far as I understand, a
> > > custom DataSourceV2 is instantiated
> > > automatically by spark thanks to
> > > sparkSession.readStream().format(providerClassName) or similar code. The
> > > problem
> > > is that I can only pass options of primitive types + String so I cannot
> > > pass the Beam Source to DataSourceV2. =>
> > > Is there a way to do so ?
> > >
> > > Also I get as an output a Dataset<Row>. The Row contains an instance of
> > > Beam WindowedValue<T>, T is the type
> > > parameter of the Source. I do a map on the Dataset to transform it to a
> > > Dataset<WindowedValue<T>>. I have a
> > > question related to the Encoder: => how to properly create an Encoder for
> > > the generic type WindowedValue<T> to use
> > > in the map?
> > > Here is the
> > > code:https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > > And more specially:
> > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.javahttps://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> > > Thanks,
> > > Etienne
> > >
> > >
> > >
> > >
> > >
> > >
> > >