Hi Dian,

Regarding iter_rows, iter_batch, and take I'll weigh in while Zander is off
for a few weeks. I think both iter APIs make a lot of sense and match the
ecosystem precedent as you've outlined.

For `take` I'm thinking the list[dict] return type might be limiting. In a
notebook/ipython environment I'd probably resort to something like
`df.limit(n).to_pandas()` instead. I also see we already have proposals for
`df.show(n)` and `df.limit(n).collect()` which would serve similar
purposes. Including the convenience method `take(n)` seems reasonable based
on the precedence in pyspark and ray.

This is a bit of a tangent, but for a better experience we might want to
consider having a global notebook-level config which can change the return
type of interactive `take` executions to pandas or arrow for a given
session. Alternatively, we could consider returning a data container class
that converts on the fly to different formats and has some sane defaults
that work out of the box. I don't think this should block this FLIP, but I
anticipate this could become a source of papercuts / friction for
interactive workflows.

Here are some other minor interface feedback after combing through the FLIP
in more detail and comparing to other existing libraries:

1. `pf.Over(..args)` passed as arg to `.over(` method, can we flatten this
to `.over(..args)`? This is how it looks to be used in the polars examples
here:
https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.over.html#polars.Expr.over

2. Similarly, `.window(pf.Tumble(..args))` can be flattened to
`.tumble(..args)`, `.cumulate(..args)` etc. See a similar reference in ibis
here: https://ibis-project.org/how-to/extending/streaming

3. I'm not convinced having both the `with_column` and `with_columns`
interfaces is worth it considering `with_columns` can also accept just 1
argument. The main difference otherwise is `with_column` expects two inputs
for the key and value respectively. If the main point is to have a
workaround for when the key has special characters we can also keep just 1
method that accepts aliased expressions similar to polars:
https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.with_columns.html
e.g. `df.with_columns((pl.col("a") ** 2).alias("a^2"))`

Do you think it is worth keeping both or reducing to `with_columns` which
covers the required functionality?

4. The spec of `filter(predicate, **constraints)` seems limiting to me. It
allows passing multiple arguments but all args past the first must be
`key=value` syntax. I think we can tweak this to match polars behavior of
accepting N predicates and improve the usability: `filter(*predicates,
**constraints)` -
https://docs.pola.rs/api/python/dev/reference/dataframe/api/polars.DataFrame.filter.html

5. With `map` expecting the `return_dtype` as an argument, this feels
clunky when you might want to re-use a function in multiple places. It
would be much better to define the return type as a type hint or annotation
in the function itself to avoid coupling typing to the transformation
call-site rather than the transformation definition itself.

A separate question on `map` - your examples do not mark the functions as
UDFs. Is this a convenience syntax? I assume these will be registered to
run as UDFs under the hood, is that right?

Best regards,
Cole



On Sat, Jun 27, 2026 at 5:05 PM Dian Fu <[email protected]> wrote:

> Hi Zander,
>
> Thanks for the suggestions. The overall idea makes a lot of sense to
> me. I did some investigation based on your suggestions and slightly
> adjusted the API shape while keeping the same intent. Could you take a
> look and let me know whether the following API makes sense to you?
>
> ```
> def iter_rows(
>     *,
>     include_row_kind: bool = False,
>     row_kind_field: str = "__row_kind__",
> ) -> CloseableIterator[Dict[str, Any]]
>
> def iter_batches(
>     *,
>     batch_size: int = 1000,
>     batch_format: Literal["pandas", "pyarrow"] = "pandas",
>     include_row_kind: bool = False,
>     row_kind_field: str = "__row_kind__",
> ) -> CloseableIterator[pandas.DataFrame | pyarrow.Table]
>
> def take(
>     n: int,
>     *,
>     timeout: int | None = None,
>     include_row_kind: bool = False,
>     row_kind_field: str = "__row_kind__",
> ) -> List[Dict[str, Any]]
> ```
>
> That's, introducing `iter_rows()` and `iter_batches()` as the explicit
> incremental result-consuming APIs, and `take()` as the bounded preview
> API.
>
> This is also aligned with APIs in other systems. Ray Dataset provides
> `iter_rows()`, `iter_batches()`, and `take()`. Daft provides
> `iter_rows()` for row-wise iteration and `iter_partitions()` for
> partition iteration. Polars has `iter_rows()` on eager DataFrames and
> `collect_batches()` for batched iteration. Spark provides similar
> result-consuming APIs such as `toLocalIterator()` and `take()`,
> although the names are different.
>
> For bounded preview, I would prefer `take(n, timeout=...)` over
> `fetch(n, timeout=...)`. `take(n)` is more commonly used in
> distributed DataFrame/Dataset APIs, such as Spark and Ray, to mean
> returning up to the first `n` rows to the client.
>
> Regards,
> Dian
>
> On Thu, Jun 25, 2026 at 8:40 AM Zander Matheson <[email protected]>
> wrote:
> >
> > Hi Dian,
> >
> > Thanks for the thorough responses, this all looks great! Agreed on the
> > resolved points: the drop_null/fill_null split with the Polars precedent,
> > the descending= alignment with Polars/Daft/Ray, map/map_batches following
> > Ray Data, no implicit index, and immutable lazy plans (no in_place).
> Those
> > are all convincing, and totally make sense. Thanks for adding
> > show/__repr__/_repr_html_/_repr_mimebundle_ and describe()!
> >
> > On the execution trigger, here's what I have in mind. A closeable
> iterator
> > so you can build more finite control. I believe that Flink already has
> the
> > machinery: TableResult.collect() returns a closeable iterator, and
> closing
> > it cancels the job. I'd propose surfacing that at the DataFrame level as
> > the documented low-level escape hatch -- roughly:
> >
> >     # low-level handle: submits the job, returns lazily, does NOT drain
> to
> > completion
> >     handle = df.execute()
> >
> >     # pull rows incrementally as they arrive
> >     with handle as rows:
> >         for row in itertools.islice(rows, 20):
> >             ...
> >     # leaving the context (or handle.close()) cancels the job
> >
> >     # or a bounded, time-boxed fetch that always terminates on a live
> > stream:
> >     preview = df.fetch(n=20, timeout="10s")   # take up to n rows or
> until
> > deadline, then cancel
> >
> > The goals are essentially:
> >
> > - Bounded + time-boxed, so it terminates on an unbounded source instead
> of
> > hanging.
> > - Close-cancels, so a notebook peek doesn't leave a job running.
> > - Incremental, so tooling can render rows as they arrive (the basis for
> an
> > Ibis-style interactive/_repr_html_ mode).
> > - Changelog-aware, so the iterator yields rows with their RowKind, so a
> > helper can either show the raw +I/-U/+U/-D stream or maintain a
> > materialized snapshot for updating queries (the same distinction the SQL
> > client's table vs changelog result modes already make).
> >
> > The implementation can be ironed out later if the above pseudo-code is
> not
> > on track.
> >
> > Fully agree the primary use case is AI/ML enrichment for FLIP-577. My
> > interest in the low-level trigger is mostly that the
> build-inspect-iterate
> > loop is how people will develop those enrichment pipelines, so a solid
> > interactive primitive pays off there too.
> >
> > Thanks again -- really looking forward to collaborating on this.
> >
> > Best,
> > Zander
> >
> > On Thu, Jun 18, 2026 at 7:27 AM Dian Fu <[email protected]> wrote:
> >
> > > Hi Zander,
> > >
> > > Thanks for the feedback!
> > >
> > > Replies inline.
> > >
> > > Relationship to FLIP-541: Good point. FLIP-541 took the approach of
> > > evolving the Table API itself toward a more DataFrame-like style. In
> > > practice, though, I found that it turned out to be hard to push
> > > forward: the Table API already has a large API surface, and for
> > > compatibility reasons the DataFrame-style additions would have to
> > > coexist with the existing Table-style APIs. Mixing the two styles in
> > > one API tends to make things more confusing for users rather than
> > > less. So I think a more viable direction may be to keep the Table API
> > > positioned as the API aligned with Flink's existing relational/Table
> > > model and introduce a separate, dedicated DataFrame API alongside it.
> > > That keeps each API internally consistent — Table API for the
> > > Flink-native relational model, DataFrame API for the Python/DataFrame
> > > mental model — instead of blending both into one surface.
> > >
> > > API parity: My overall take is that we should adopt a DataFrame-style
> > > API design rather than 100% following Pandas DataFrame API. This is
> > > also the direction taken by the newer generation of Python data
> > > projects such as Polars, Daft, and Ray Data: they share the familiar
> > > DataFrame mental model and ergonomics, but each adapts the API to its
> > > own execution engine and semantics rather than mirroring pandas
> > > exactly. We follow the same philosophy here — borrow the
> > > widely-adopted ergonomics, but stay consistent with Flink's
> > > streaming/distributed semantics.
> > >
> > > On the specific points:
> > > 1. dropna/fillna vs drop_null/fill_null: I'd lean toward keeping
> > > drop_null/fill_null. Flink's type system has a real NULL (and NaN is a
> > > distinct float value) and actually Python has None, so the FLIP
> > > separates them: drop_null/fill_null for NULL and drop_nan/fill_nan for
> > > NaN. The pandas dropna name conflates the two. Polars made the same
> > > NULL/NaN split with drop_nulls/fill_null + fill_nan, so this also has
> > > precedent.
> > > 2. show() / display(): Good point. Peeking at data is essential. Have
> > > added show, __repr__, _repr_html_ and _repr_mimebundle_ in the section
> > > "DataFrame — Conversion & Execution".
> > > 3. sort(by, descending=False): I kept descending= rather than
> > > ascending=. The default is ascending in all cases, matching
> > > pandas/PySpark/Polars/Daft behavior — the only difference is the flag
> > > name, and the flag name descending aligns with Polars/Daft/Ray. While
> > > Pyspark/Pandas takes ascending as the flag name. For me, I slightly
> > > tend to align with Polars/Daft/Ray since our motivation is multimodal
> > > data processing and our users will be familiar with these projects.
> > > However, I'm open to this.
> > > 4. map() element-wise vs row-wise: You're right that pandas
> > > DataFrame.map is element-wise (it's the renamed applymap), while ours
> > > is row-wise. We intentionally follow the Ray Data model here (map =
> > > per-row, map_batches = vectorized), which fits the enrichment/ML use
> > > cases. The name `apply` isn't paired with map_batches and so I tend to
> > > use `map` here.
> > > 5. Index: Yes, this is deliberate — the API has no implicit
> > > pandas-style index. Flink is a distributed/streaming engine with no
> > > global row order, so a positional index isn't well-defined. Row/column
> > > selection is done by column reference and boolean filtering instead.
> > > 6. in_place: DataFrames here are lazy logical plans, not mutable
> > > in-memory buffers, so every operation returns a new DataFrame —
> > > there's nothing to mutate in place.
> > > 7. to_json/from_json and .values(): JSON is supported for
> > > sources/sinks via read_json/write_json. Is that what you want?
> > > 8. Properties (df.shape/df.info/df.describe): Have added `describe()`
> > > in section "13. DataFrame — Conversion & Execution". Regarding
> > > `df.shape and df.info`: I left out for now since it will force data
> > > processing for `df.shape` which seems not that useful and schema
> > > inspection is already available via df.schema.
> > >
> > > Execution model / low-level trigger: I like the escape-hatch idea.
> > > Today an execution can be triggered via
> > > to_pandas()/to_list()/collect()/show(). Could you elaborate on what
> > > specific API you have in mind?
> > >
> > > EDA / common use cases: The primary use cases I envision are ML/AI
> > > enrichment (this is to support FLIP-577: AI-Native Flink — An Umbrella
> > > Proposal for Multimodal Data Processing). EDA is supported
> > > (show/describe/to_pandas) but I'd keep the heavier EDA helpers minimal
> > > and grow them based on demand.
> > >
> > > Thanks again and looking forward to collaborating!
> > >
> > > Regards,
> > > Dian
> > >
> > > On Thu, Jun 18, 2026 at 8:50 PM Dian Fu <[email protected]> wrote:
> > > >
> > > > Hi Cole,
> > > >
> > > > Thanks for the feedback. Both suggestions make sense for me, and I've
> > > > updated the FLIP.
> > > >
> > > > 1. kwargs aliasing in .agg: see section "6. DataFrame — Aggregation"
> > > > 2. Attribute-style column references: see section "15. DataFrame —
> > > > Column Access".
> > > >
> > > > Thanks again!
> > > >
> > > > Regards,
> > > > Dian
> > > >
> > > > On Thu, Jun 18, 2026 at 11:06 AM Yong Fang <[email protected]>
> wrote:
> > > > >
> > > > > Thanks Fu Dian for initiating this discussion and +1 for the total
> > > design.
> > > > >
> > > > > I have several comments for this flip:
> > > > >
> > > > > 1. In multi-modal data processing scenarios, the operations are
> mostly
> > > > > reading and writing files, images, audio and video. From the
> current
> > > API
> > > > > perspective, these operations can only be added manually in
> > > read_custom and
> > > > > write_custom, how are the read/write APIs for these types designed?
> > > > >
> > > > > 2. Currently, batch_size is controlled by row count in map_batches.
> > > > > However, the per-row size of multimodal data varies dramatically —
> a
> > > single
> > > > > 4K image can be up to 20 MB, while a piece of text may only be 100
> B.
> > > > > Splitting batches purely by row count may lead to OOM. Should we
> > > support
> > > > > splitting batches by memory budget too?
> > > > >
> > > > > 3. GPU computing is a common requirement in multimodal processing,
> I
> > > don't
> > > > > seem to see any related information in this set of APIs such as
> > > > > map/map_batches and ect. How can we set GPU/CPU resources and
> > > > > specifications for them and udfs?
> > > > >
> > > > > 4. In addition, users may need to load local models within
> > > map/map_batches
> > > > > for data processing. The current APIs only support the callback
> format.
> > > > > Should class types also be supported? This way, I/O and other
> > > operations
> > > > > only need to be executed once for a class instance.
> > > > >
> > > > > Best,
> > > > > FangYong
> > > > >
> > > > > On Thu, Jun 18, 2026 at 7:13 AM Zander Matheson <
> > > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Very nice proposal, Dian Fu, thank you for putting this together!
> > > > > >
> > > > > > It feels like it supersedes FLIP-541
> > > > > > <
> > > > > >
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=378473217__;!!Ayb5sqE7!pkd_p6kbikrSJswJL1KpUnxbOpzUIlGO8fwpFgJjKizt71Knz9zOmngXYcJaLVhrjOeFyLXamBXkOMhint9aSL4$
> > > > > > >
> > > > > > since
> > > > > > it will solve many of the problems we initially discussed
> related to
> > > making
> > > > > > Flink more Pythonic.
> > > > > >
> > > > > > I had some questions and comments on the flip shared below.
> Overall I
> > > > > > really like the design you have proposed and look forward to
> > > collaborating
> > > > > > hopefully!
> > > > > >
> > > > > > *Pandas/PySpark/Polars API Parity*
> > > > > > What are your thoughts on parity with some of the established
> APIs.
> > > I see
> > > > > > it is mentioned in the FLIP to both lower the barrier to entry
> for
> > > Python
> > > > > > users who are familiar with Dataframe APIs, while also having a
> > > non-goal of
> > > > > > providing full compatibility with Pandas, Polars etc. Below are a
> > > couple of
> > > > > > dataframe api common that we could potentially more closely
> align to.
> > > > > >
> > > > > >    1. dropna and fillna instead of drop_null/fill_null. Python
> does
> > > not
> > > > > >    have a Null concept, so it might make sense to keep the
> drop_na
> > > used in
> > > > > >    other dataframe libraries?
> > > > > >    2. show(), .display(). How to peek at the data? This is common
> > > patter in
> > > > > >    development
> > > > > >    3. sort(by, descending=False). This boolean flag is opposite
> what
> > > is
> > > > > >    used in other libraries.
> > > > > >    4. map() in Dataframe land is an element-wise computation, do
> we
> > > want to
> > > > > >    break with that? I believe apply() is more similar. That being
> > > said,
> > > > > > map as
> > > > > >    intended from map reduce is more akin to the row-wise model
> > > shown. More
> > > > > > a
> > > > > >    question than anything else.
> > > > > >    5. Index - I didn’t see any mention to indices in the
> document,
> > > is this
> > > > > >    something we are explicitly avoiding? I have often use
> indices for
> > > > > >    selecting groups of rows or columns for manipulation.
> > > > > >    6. In_place - this may not be possible with the execution
> style,
> > > but is
> > > > > >    there a possibility of having in_place booleans for things
> like
> > > unique
> > > > > > or
> > > > > >    sort where the output can either be done in place or the
> output
> > > needs
> > > > > > to be
> > > > > >    assigned to a new dataframe.
> > > > > >    7. to_json/from_json and .values().
> > > > > >    8. Properties - Do we want to add some of the niceties of
> > > spark/pandas
> > > > > >    here if possible? df.shape, df.info, df.describe
> > > > > >
> > > > > >
> > > > > > *Execution Model*I like the idea of having triggers for the
> > > execution on
> > > > > > write and materialization (to_pandas etc.). This is probably the
> > > meat of
> > > > > > the problem for creating a dataframe API that feels like the
> other
> > > commonly
> > > > > > used dataframe APIs. The balance between write and providing the
> > > > > > statement_set to make the writes coordinated seems nice. But is
> > > there also
> > > > > > potentially a world where we want to expose the ability to
> > > arbitrarily
> > > > > > trigger an execution? Say for Exploratory Data Analysis in a
> > > notebook? This
> > > > > > seems possible with to_pandas and to_list etc., but maybe having
> a
> > > lower
> > > > > > level primitive that could be called could be a good escape
> hatch?
> > > > > >
> > > > > >
> > > > > > *Exploratory Data Analysis*One of the biggest use cases for
> > > dataframes is
> > > > > > exploratory data analysis. Is this something we want to encourage
> > > with this
> > > > > > FLIP? It might make sense to add some of the EDA methods for that
> > > purpose.
> > > > > > See above, show/display and execution trigger.
> > > > > >
> > > > > > *Common Use Cases*
> > > > > > Related to the EDA note above, I am curious what you envision as
> the
> > > most
> > > > > > common use cases for this API.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Zander
> > > > > >
> > > > > > On Wed, Jun 17, 2026 at 2:42 AM Cole Bailey via dev <
> > > [email protected]>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Dian,
> > > > > > >
> > > > > > > There is a lot of good work here that aligns with what we have
> been
> > > > > > > brainstorming for a better PyFlink experience.
> > > > > > >
> > > > > > > One ergonomic suggestion I'd love to see included is supporting
> > > pythonic
> > > > > > > aliasing via kwargs within `.agg` similar to what is already
> > > outlined in
> > > > > > > `with_columns` or `select`:
> > > > > > >
> > > > > > > The example would instead look like this:
> > > > > > >
> > > > > > > df.group_by("dept").agg(
> > > > > > >     avg_salary=col("salary").avg(),
> > > > > > >     headcount=col("id").count(),
> > > > > > > )
> > > > > > >
> > > > > > >
> > > > > > > Another nice-to-have would be flexibility in column
> referencing, I
> > > see 2
> > > > > > > variations scattered throughout the FLIP:
> > > > > > >
> > > > > > > col("age")
> > > > > > >
> > > > > > > df["age"]
> > > > > > >
> > > > > > >
> > > > > > > Both of these make sense, I think we should also consider
> > > supporting attr
> > > > > > > style column references since these can be reused across the
> > > lambda or
> > > > > > > subscript filtering examples already in the FLIP:
> > > > > > >
> > > > > > > df.age
> > > > > > >
> > > > > > >
> > > > > > > That would then give us this representative example:
> > > > > > >
> > > > > > > df.group_by("dept").agg(
> > > > > > >     avg_salary=df.salary.avg(),
> > > > > > >     headcount=df.id.count(),
> > > > > > > )
> > > > > > >
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Cole
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jun 17, 2026 at 6:12 AM Dian Fu <[email protected]
> >
> > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I would like to start a discussion about FLIP-591:
> Introducing
> > > Python
> > > > > > > > DataFrame API in PyFlink [1].
> > > > > > > >
> > > > > > > > This FLIP is to a sub-FLIP of the broader direction
> discussed in
> > > > > > > > FLIP-577 (AI-Native Flink — An Umbrella Proposal for
> Multimodal
> > > Data
> > > > > > > > Processing) [2]. This FLIP proposes a new public Python
> module,
> > > > > > > > `pyflink.dataframe`, as a DataFrame-style API on top of the
> > > existing
> > > > > > > > PyFlink Table API. The goal is not to introduce a new
> execution
> > > model,
> > > > > > > > but to provide a more natural Python-facing entry point for
> users
> > > > > > > > coming from the broader Python data ecosystem, while
> preserving
> > > Flink
> > > > > > > > semantics and execution capabilities.
> > > > > > > >
> > > > > > > > The proposal focuses on:
> > > > > > > >     - Designing a Python-friendly DataFrame API for PyFlink,
> > > including
> > > > > > > > the API shape itself, a more user-friendly DataType design,
> > > unified
> > > > > > > > configuration, reduced TableEnvironment boilerplate, and a
> > > practical
> > > > > > > > multiple-sink model for end-to-end pipelines
> > > > > > > >     - Providing ergonomic support for row-oriented Python
> > > > > > > > transformations, including map / map_batches style
> operations for
> > > > > > > > enrichment, feature engineering, and AI/ML workloads
> > > > > > > >     - Exposing concurrency configuration so that expensive
> Python
> > > > > > > > stages can be scaled independently, making it easier to build
> > > > > > > > practical jobs directly with the DataFrame API
> > > > > > > >     - Supporting Arrow as a first-class batch format for
> > > efficient
> > > > > > > > interoperability with the Python ecosystem
> > > > > > > >
> > > > > > > > The Design Decisions section discusses the main design
> > > considerations
> > > > > > > > behind the proposal and may be a useful place to pay extra
> > > attention
> > > > > > > > when reviewing it.
> > > > > > > >
> > > > > > > > Looking forward to your feedback!
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Dian
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-591*3A*Introducing*Python*DataFrame*API*in*PyFlink__;JSsrKysrKw!!Ayb5sqE7!t1Rd2wTS8LFWmjw7srNbCUz4lZ5NXo__BnGTzGFeJ5BeO4T4tOCZ1hCNysc10NKuLUuegGThcr5ksMSizWPE4Qo$
> > > > > > > > [2]
> > > > > > > >
> > > > > > >
> > > > > >
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=421957275__;!!Ayb5sqE7!t1Rd2wTS8LFWmjw7srNbCUz4lZ5NXo__BnGTzGFeJ5BeO4T4tOCZ1hCNysc10NKuLUuegGThcr5ksMSiab5Dp28$
> > > > > > > >
> > > > > > >
> > > > > >
> > >
>

Reply via email to