This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f23afd28 Add context-aware XML parser (port from akka/alpakka#2935) 
(#1595)
8f23afd28 is described below

commit 8f23afd28bcca0abd5b9fe8789d769638d53093a
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 15:46:13 2026 +0200

    Add context-aware XML parser (port from akka/alpakka#2935) (#1595)
    
    * Add context-aware XML parser from alpakka/alpakka#2935
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/8d9b4a09-c1f0-4ae1-8e13-efd1c429b704
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix typo: consequitive -> consecutive
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/8d9b4a09-c1f0-4ae1-8e13-efd1c429b704
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Optimize byte string conversion to array
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../connectors/xml/impl/StreamingXmlParser.scala   | 62 ++++++++++++++++------
 .../stream/connectors/xml/javadsl/XmlParsing.scala | 28 +++++++++-
 .../connectors/xml/scaladsl/XmlParsing.scala       | 28 +++++++++-
 .../scala/docs/scaladsl/XmlProcessingSpec.scala    | 41 +++++++++++++-
 4 files changed, 139 insertions(+), 20 deletions(-)

diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
index 05f12cf1f..6d8496820 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
@@ -28,21 +28,45 @@ import scala.annotation.tailrec
 
 private[xml] object StreamingXmlParser {
   lazy val withStreamingFinishedException = new IllegalStateException("Stream 
finished before event was fully parsed.")
+
+  sealed trait ContextHandler[A, B, Ctx] {
+    def getByteString(a: A): ByteString
+    def getContext(a: A): Ctx
+    def buildOutput(pe: ParseEvent, ctx: Ctx): B
+  }
+
+  object ContextHandler {
+    final val uncontextual: ContextHandler[ByteString, ParseEvent, Unit] =
+      new ContextHandler[ByteString, ParseEvent, Unit] {
+        def getByteString(a: ByteString): ByteString = a
+        def getContext(a: ByteString): Unit = ()
+        def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe
+      }
+
+    final def contextual[Ctx]: ContextHandler[(ByteString, Ctx), (ParseEvent, 
Ctx), Ctx] =
+      new ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] {
+        def getByteString(a: (ByteString, Ctx)): ByteString = a._1
+        def getContext(a: (ByteString, Ctx)): Ctx = a._2
+        def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe, 
ctx)
+      }
+  }
 }
 
 /**
  * INTERNAL API
  */
-@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean,
-    configureFactory: AsyncXMLInputFactory => Unit)
-    extends GraphStage[FlowShape[ByteString, ParseEvent]] {
-  val in: Inlet[ByteString] = Inlet("XMLParser.in")
-  val out: Outlet[ParseEvent] = Outlet("XMLParser.out")
-  override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out)
+@InternalApi private[xml] class StreamingXmlParser[A, B, 
Ctx](ignoreInvalidChars: Boolean,
+    configureFactory: AsyncXMLInputFactory => Unit,
+    transform: StreamingXmlParser.ContextHandler[A, B, Ctx])
+    extends GraphStage[FlowShape[A, B]] {
+  val in: Inlet[A] = Inlet("XMLParser.in")
+  val out: Outlet[B] = Outlet("XMLParser.out")
+  override val shape: FlowShape[A, B] = FlowShape(in, out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
       private var started: Boolean = false
+      private var context: Ctx = _
 
       import javax.xml.stream.XMLStreamConstants
 
@@ -56,7 +80,10 @@ private[xml] object StreamingXmlParser {
       setHandlers(in, out, this)
 
       override def onPush(): Unit = {
-        val array = grab(in).toArray
+        val a = grab(in)
+        val bs = transform.getByteString(a)
+        context = transform.getContext(a)
+        val array = bs.toArrayUnsafe()
         parser.getInputFeeder.feedInput(array, 0, array.length)
         advanceParser()
       }
@@ -78,10 +105,10 @@ private[xml] object StreamingXmlParser {
 
             case XMLStreamConstants.START_DOCUMENT =>
               started = true
-              push(out, StartDocument)
+              push(out, transform.buildOutput(StartDocument, context))
 
             case XMLStreamConstants.END_DOCUMENT =>
-              push(out, EndDocument)
+              push(out, transform.buildOutput(EndDocument, context))
               completeStage()
 
             case XMLStreamConstants.START_ELEMENT =>
@@ -102,26 +129,29 @@ private[xml] object StreamingXmlParser {
               val optNs = optPrefix.flatMap(prefix => 
Option(parser.getNamespaceURI(prefix)))
               push(
                 out,
-                StartElement(parser.getLocalName,
+                transform.buildOutput(StartElement(parser.getLocalName,
                   attributes,
                   optPrefix.filterNot(_ == ""),
                   optNs.filterNot(_ == ""),
-                  namespaceCtx = namespaces))
+                  namespaceCtx = namespaces),
+                  context))
 
             case XMLStreamConstants.END_ELEMENT =>
-              push(out, EndElement(parser.getLocalName))
+              push(out, transform.buildOutput(EndElement(parser.getLocalName), 
context))
 
             case XMLStreamConstants.CHARACTERS =>
-              push(out, Characters(parser.getText))
+              push(out, transform.buildOutput(Characters(parser.getText), 
context))
 
             case XMLStreamConstants.PROCESSING_INSTRUCTION =>
-              push(out, ProcessingInstruction(Option(parser.getPITarget), 
Option(parser.getPIData)))
+              push(out,
+                
transform.buildOutput(ProcessingInstruction(Option(parser.getPITarget), 
Option(parser.getPIData)),
+                  context))
 
             case XMLStreamConstants.COMMENT =>
-              push(out, Comment(parser.getText))
+              push(out, transform.buildOutput(Comment(parser.getText), 
context))
 
             case XMLStreamConstants.CDATA =>
-              push(out, CData(parser.getText))
+              push(out, transform.buildOutput(CData(parser.getText), context))
 
             // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, 
ENTITY_DECLARATION, PROCESSING_INSTRUCTION
             // ATTRIBUTE is handled in START_ELEMENT implicitly
diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
index 3a2fca1e5..c73186f6b 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
@@ -33,12 +33,28 @@ object XmlParsing {
   def parser(): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
     xml.scaladsl.XmlParsing.parser.asJava
 
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   */
+  def parserWithContext[Ctx](): 
pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] 
=
+    xml.scaladsl.XmlParsing.parserWithContext().asJava
+
   /**
    * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX.
    */
   def parser(ignoreInvalidChars: Boolean): 
pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
     xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava
 
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean
+  ): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, 
NotUsed] =
+    xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava
+
   /**
    * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX.
    */
@@ -55,7 +71,17 @@ object XmlParsing {
     xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, 
configureFactory.accept(_)).asJava
 
   /**
-   * A Flow that transforms a stream of XML ParseEvents. This stage coalesces 
consequitive CData and Characters
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean,
+      configureFactory: Consumer[AsyncXMLInputFactory]
+  ): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, 
NotUsed] =
+    xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, 
configureFactory.accept(_)).asJava
+
+  /**
+   * A Flow that transforms a stream of XML ParseEvents. This stage coalesces 
consecutive CData and Characters
    * events into a single Characters event or fails if the buffered string is 
larger than the maximum defined.
    */
   def coalesce(maximumTextLength: Int): pekko.stream.javadsl.Flow[ParseEvent, 
ParseEvent, NotUsed] =
diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
index 2cf101363..06e12d33e 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream.connectors.xml.ParseEvent
 import pekko.stream.connectors.xml.impl
-import pekko.stream.scaladsl.Flow
+import pekko.stream.scaladsl.{ Flow, FlowWithContext }
 import pekko.util.ByteString
 import com.fasterxml.aalto.AsyncXMLInputFactory
 import org.w3c.dom.Element
@@ -50,7 +50,31 @@ object XmlParsing {
    */
   def parser(ignoreInvalidChars: Boolean = false,
       configureFactory: AsyncXMLInputFactory => Unit = configureDefault): 
Flow[ByteString, ParseEvent, NotUsed] =
-    Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, 
configureFactory))
+    Flow[ByteString].via(
+      Flow.fromGraph(
+        new impl.StreamingXmlParser[ByteString, ParseEvent, 
Unit](ignoreInvalidChars,
+          configureFactory,
+          impl.StreamingXmlParser.ContextHandler.uncontextual)
+      )
+    )
+
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean = false,
+      configureFactory: AsyncXMLInputFactory => Unit = configureDefault
+  ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
+    FlowWithContext.fromTuples(
+      Flow.fromGraph(
+        new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](
+          ignoreInvalidChars,
+          configureFactory,
+          impl.StreamingXmlParser.ContextHandler.contextual
+        )
+      )
+    )
 
   /**
    * A Flow that transforms a stream of XML ParseEvents. This stage coalesces 
consecutive CData and Characters
diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala 
b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
index 0187ba7cf..125e8e948 100644
--- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
+++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
@@ -18,7 +18,7 @@ import pekko.actor.ActorSystem
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.connectors.xml._
 import pekko.stream.connectors.xml.scaladsl.XmlParsing
-import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
+import pekko.stream.scaladsl.{ Flow, Framing, Keep, Sink, Source }
 import pekko.util.ByteString
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.BeforeAndAfterAll
@@ -340,6 +340,45 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers 
with ScalaFutures with
       configWasCalled shouldBe true
     }
 
+    "parse XML and attach line numbers as context" in {
+      val doc = """|<doc>
+                   |  <elem>
+                   |    elem1
+                   |  </elem>
+                   |  <elem>
+                   |    elem2
+                   |  </elem>
+                   |</doc>""".stripMargin
+      val resultFuture = Source
+        .single(ByteString(doc))
+        .via(
+          Framing.delimiter(delimiter = ByteString(System.lineSeparator),
+            maximumFrameLength = 65536,
+            allowTruncation = true)
+        )
+        .zipWithIndex
+        
.runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right))
+
+      resultFuture.futureValue should ===(
+        List(
+          (StartDocument, 0L),
+          (StartElement("doc"), 0L),
+          (Characters("  "), 1L),
+          (StartElement("elem"), 1L),
+          (Characters("    elem1"), 2L),
+          (Characters("  "), 3L),
+          (EndElement("elem"), 3L),
+          (Characters("  "), 4L),
+          (StartElement("elem"), 4L),
+          (Characters("    elem2"), 5L),
+          (Characters("  "), 6L),
+          (EndElement("elem"), 6L),
+          (EndElement("doc"), 7L),
+          (EndDocument, 7L)
+        )
+      )
+    }
+
   }
 
   override protected def afterAll(): Unit = system.terminate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to