This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 8f23afd28 Add context-aware XML parser (port from akka/alpakka#2935)
(#1595)
8f23afd28 is described below
commit 8f23afd28bcca0abd5b9fe8789d769638d53093a
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 15:46:13 2026 +0200
Add context-aware XML parser (port from akka/alpakka#2935) (#1595)
* Add context-aware XML parser from alpakka/alpakka#2935
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/8d9b4a09-c1f0-4ae1-8e13-efd1c429b704
Co-authored-by: pjfanning <[email protected]>
* Fix typo: consequitive -> consecutive
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/8d9b4a09-c1f0-4ae1-8e13-efd1c429b704
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Optimize byte string conversion to array
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../connectors/xml/impl/StreamingXmlParser.scala | 62 ++++++++++++++++------
.../stream/connectors/xml/javadsl/XmlParsing.scala | 28 +++++++++-
.../connectors/xml/scaladsl/XmlParsing.scala | 28 +++++++++-
.../scala/docs/scaladsl/XmlProcessingSpec.scala | 41 +++++++++++++-
4 files changed, 139 insertions(+), 20 deletions(-)
diff --git
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
index 05f12cf1f..6d8496820 100644
---
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
+++
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
@@ -28,21 +28,45 @@ import scala.annotation.tailrec
private[xml] object StreamingXmlParser {
lazy val withStreamingFinishedException = new IllegalStateException("Stream
finished before event was fully parsed.")
+
+ sealed trait ContextHandler[A, B, Ctx] {
+ def getByteString(a: A): ByteString
+ def getContext(a: A): Ctx
+ def buildOutput(pe: ParseEvent, ctx: Ctx): B
+ }
+
+ object ContextHandler {
+ final val uncontextual: ContextHandler[ByteString, ParseEvent, Unit] =
+ new ContextHandler[ByteString, ParseEvent, Unit] {
+ def getByteString(a: ByteString): ByteString = a
+ def getContext(a: ByteString): Unit = ()
+ def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe
+ }
+
+ final def contextual[Ctx]: ContextHandler[(ByteString, Ctx), (ParseEvent,
Ctx), Ctx] =
+ new ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] {
+ def getByteString(a: (ByteString, Ctx)): ByteString = a._1
+ def getContext(a: (ByteString, Ctx)): Ctx = a._2
+ def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe,
ctx)
+ }
+ }
}
/**
* INTERNAL API
*/
-@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean,
- configureFactory: AsyncXMLInputFactory => Unit)
- extends GraphStage[FlowShape[ByteString, ParseEvent]] {
- val in: Inlet[ByteString] = Inlet("XMLParser.in")
- val out: Outlet[ParseEvent] = Outlet("XMLParser.out")
- override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out)
+@InternalApi private[xml] class StreamingXmlParser[A, B,
Ctx](ignoreInvalidChars: Boolean,
+ configureFactory: AsyncXMLInputFactory => Unit,
+ transform: StreamingXmlParser.ContextHandler[A, B, Ctx])
+ extends GraphStage[FlowShape[A, B]] {
+ val in: Inlet[A] = Inlet("XMLParser.in")
+ val out: Outlet[B] = Outlet("XMLParser.out")
+ override val shape: FlowShape[A, B] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var started: Boolean = false
+ private var context: Ctx = _
import javax.xml.stream.XMLStreamConstants
@@ -56,7 +80,10 @@ private[xml] object StreamingXmlParser {
setHandlers(in, out, this)
override def onPush(): Unit = {
- val array = grab(in).toArray
+ val a = grab(in)
+ val bs = transform.getByteString(a)
+ context = transform.getContext(a)
+ val array = bs.toArrayUnsafe()
parser.getInputFeeder.feedInput(array, 0, array.length)
advanceParser()
}
@@ -78,10 +105,10 @@ private[xml] object StreamingXmlParser {
case XMLStreamConstants.START_DOCUMENT =>
started = true
- push(out, StartDocument)
+ push(out, transform.buildOutput(StartDocument, context))
case XMLStreamConstants.END_DOCUMENT =>
- push(out, EndDocument)
+ push(out, transform.buildOutput(EndDocument, context))
completeStage()
case XMLStreamConstants.START_ELEMENT =>
@@ -102,26 +129,29 @@ private[xml] object StreamingXmlParser {
val optNs = optPrefix.flatMap(prefix =>
Option(parser.getNamespaceURI(prefix)))
push(
out,
- StartElement(parser.getLocalName,
+ transform.buildOutput(StartElement(parser.getLocalName,
attributes,
optPrefix.filterNot(_ == ""),
optNs.filterNot(_ == ""),
- namespaceCtx = namespaces))
+ namespaceCtx = namespaces),
+ context))
case XMLStreamConstants.END_ELEMENT =>
- push(out, EndElement(parser.getLocalName))
+ push(out, transform.buildOutput(EndElement(parser.getLocalName),
context))
case XMLStreamConstants.CHARACTERS =>
- push(out, Characters(parser.getText))
+ push(out, transform.buildOutput(Characters(parser.getText),
context))
case XMLStreamConstants.PROCESSING_INSTRUCTION =>
- push(out, ProcessingInstruction(Option(parser.getPITarget),
Option(parser.getPIData)))
+ push(out,
+
transform.buildOutput(ProcessingInstruction(Option(parser.getPITarget),
Option(parser.getPIData)),
+ context))
case XMLStreamConstants.COMMENT =>
- push(out, Comment(parser.getText))
+ push(out, transform.buildOutput(Comment(parser.getText),
context))
case XMLStreamConstants.CDATA =>
- push(out, CData(parser.getText))
+ push(out, transform.buildOutput(CData(parser.getText), context))
// Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION,
ENTITY_DECLARATION, PROCESSING_INSTRUCTION
// ATTRIBUTE is handled in START_ELEMENT implicitly
diff --git
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
index 3a2fca1e5..c73186f6b 100644
---
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
+++
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
@@ -33,12 +33,28 @@ object XmlParsing {
def parser(): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser.asJava
+ /**
+ * Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX while keeping
+ * a context attached.
+ */
+ def parserWithContext[Ctx]():
pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed]
=
+ xml.scaladsl.XmlParsing.parserWithContext().asJava
+
/**
* Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX.
*/
def parser(ignoreInvalidChars: Boolean):
pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava
+ /**
+ * Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX while keeping
+ * a context attached.
+ */
+ def parserWithContext[Ctx](
+ ignoreInvalidChars: Boolean
+ ): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx,
NotUsed] =
+ xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava
+
/**
* Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX.
*/
@@ -55,7 +71,17 @@ object XmlParsing {
xml.scaladsl.XmlParsing.parser(ignoreInvalidChars,
configureFactory.accept(_)).asJava
/**
- * A Flow that transforms a stream of XML ParseEvents. This stage coalesces
consequitive CData and Characters
+ * Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX while keeping
+ * a context attached.
+ */
+ def parserWithContext[Ctx](
+ ignoreInvalidChars: Boolean,
+ configureFactory: Consumer[AsyncXMLInputFactory]
+ ): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx,
NotUsed] =
+ xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars,
configureFactory.accept(_)).asJava
+
+ /**
+ * A Flow that transforms a stream of XML ParseEvents. This stage coalesces
consecutive CData and Characters
* events into a single Characters event or fails if the buffered string is
larger than the maximum defined.
*/
def coalesce(maximumTextLength: Int): pekko.stream.javadsl.Flow[ParseEvent,
ParseEvent, NotUsed] =
diff --git
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
index 2cf101363..06e12d33e 100644
---
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
+++
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.stream.connectors.xml.ParseEvent
import pekko.stream.connectors.xml.impl
-import pekko.stream.scaladsl.Flow
+import pekko.stream.scaladsl.{ Flow, FlowWithContext }
import pekko.util.ByteString
import com.fasterxml.aalto.AsyncXMLInputFactory
import org.w3c.dom.Element
@@ -50,7 +50,31 @@ object XmlParsing {
*/
def parser(ignoreInvalidChars: Boolean = false,
configureFactory: AsyncXMLInputFactory => Unit = configureDefault):
Flow[ByteString, ParseEvent, NotUsed] =
- Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars,
configureFactory))
+ Flow[ByteString].via(
+ Flow.fromGraph(
+ new impl.StreamingXmlParser[ByteString, ParseEvent,
Unit](ignoreInvalidChars,
+ configureFactory,
+ impl.StreamingXmlParser.ContextHandler.uncontextual)
+ )
+ )
+
+ /**
+ * Parser Flow that takes a stream of ByteStrings and parses them to XML
events similar to SAX while keeping
+ * a context attached.
+ */
+ def parserWithContext[Ctx](
+ ignoreInvalidChars: Boolean = false,
+ configureFactory: AsyncXMLInputFactory => Unit = configureDefault
+ ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
+ FlowWithContext.fromTuples(
+ Flow.fromGraph(
+ new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](
+ ignoreInvalidChars,
+ configureFactory,
+ impl.StreamingXmlParser.ContextHandler.contextual
+ )
+ )
+ )
/**
* A Flow that transforms a stream of XML ParseEvents. This stage coalesces
consecutive CData and Characters
diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
index 0187ba7cf..125e8e948 100644
--- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
+++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
@@ -18,7 +18,7 @@ import pekko.actor.ActorSystem
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.connectors.xml._
import pekko.stream.connectors.xml.scaladsl.XmlParsing
-import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
+import pekko.stream.scaladsl.{ Flow, Framing, Keep, Sink, Source }
import pekko.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterAll
@@ -340,6 +340,45 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers
with ScalaFutures with
configWasCalled shouldBe true
}
+ "parse XML and attach line numbers as context" in {
+ val doc = """|<doc>
+ | <elem>
+ | elem1
+ | </elem>
+ | <elem>
+ | elem2
+ | </elem>
+ |</doc>""".stripMargin
+ val resultFuture = Source
+ .single(ByteString(doc))
+ .via(
+ Framing.delimiter(delimiter = ByteString(System.lineSeparator),
+ maximumFrameLength = 65536,
+ allowTruncation = true)
+ )
+ .zipWithIndex
+
.runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right))
+
+ resultFuture.futureValue should ===(
+ List(
+ (StartDocument, 0L),
+ (StartElement("doc"), 0L),
+ (Characters(" "), 1L),
+ (StartElement("elem"), 1L),
+ (Characters(" elem1"), 2L),
+ (Characters(" "), 3L),
+ (EndElement("elem"), 3L),
+ (Characters(" "), 4L),
+ (StartElement("elem"), 4L),
+ (Characters(" elem2"), 5L),
+ (Characters(" "), 6L),
+ (EndElement("elem"), 6L),
+ (EndElement("doc"), 7L),
+ (EndDocument, 7L)
+ )
+ )
+ }
+
}
override protected def afterAll(): Unit = system.terminate()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]