This is an automated email from the ASF dual-hosted git repository.
aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 7d42cb63dd fix(dataset): enforce max file size for multipart upload
(#4146)
7d42cb63dd is described below
commit 7d42cb63dda03c28943e6de6b871280649e121da
Author: carloea2 <[email protected]>
AuthorDate: Mon Jan 19 16:19:41 2026 -0800
fix(dataset): enforce max file size for multipart upload (#4146)
### What changes were proposed in this PR?
* **Enforce the `single_file_upload_max_size_mib` limit for multipart
uploads at init** by requiring `fileSizeBytes` + `partSizeBytes` and
rejecting when the total declared file size exceeds the configured max.
* **Persist multipart sizing metadata in DB** by adding
`file_size_bytes` and `part_size_bytes` to `dataset_upload_session`,
plus constraints to keep them valid.
* **Harden `uploadPart` against size bypasses** by computing the
expected part size from the stored session metadata and rejecting any
request whose `Content-Length` does not exactly match the expected size
(including the final part).
* **Add a final server-side safety check at finish**: after lakeFS
reports the completed object size, compare to the max and roll back the
object if it exceeds the limit.
* **Update frontend init call** to pass `fileSizeBytes` and
`partSizeBytes` when initializing multipart uploads.
* **Add DB migration** (`sql/updates/18.sql`) to apply the schema change
on existing deployments.
### Any related issues, documentation, discussions?
Close https://github.com/apache/texera/issues/4147
### How was this PR tested?
* Added/updated unit tests for multipart upload validation and malicious
cases, including:
* max upload size enforced at init (over/equals boundaries + 2-part
boundary)
* header poisoning and `Content-Length` mismatch rejection
(non-numeric/overflow/mismatch)
* finish rollback when max is tightened before finish (oversized object
must not remain accessible)
### Was this PR authored or co-authored using generative AI tooling?
Co-authored-by: ChatGPT
---------
Signed-off-by: carloea2 <[email protected]>
Co-authored-by: Chen Li <[email protected]>
---
.../texera/service/resource/DatasetResource.scala | 198 ++++++++--
.../service/resource/DatasetResourceSpec.scala | 404 +++++++++++++++++++--
.../service/user/dataset/dataset.service.ts | 3 +-
sql/texera_ddl.sql | 28 +-
sql/updates/19.sql | 60 +++
5 files changed, 619 insertions(+), 74 deletions(-)
diff --git
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index dd53ced373..39cba2c84f 100644
---
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -53,9 +53,10 @@ import org.apache.texera.service.util.S3StorageClient.{
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS,
MINIMUM_NUM_OF_MULTIPART_S3_PART
}
-import org.jooq.{DSLContext, EnumType}
import org.jooq.impl.DSL
import org.jooq.impl.DSL.{inline => inl}
+import org.jooq.{DSLContext, EnumType}
+
import java.io.{InputStream, OutputStream}
import java.net.{HttpURLConnection, URI, URL, URLDecoder}
import java.nio.charset.StandardCharsets
@@ -81,6 +82,16 @@ object DatasetResource {
.getInstance()
.createDSLContext()
+ private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long =
20L): Long = {
+ val limit = ctx
+ .select(DSL.field("value", classOf[String]))
+ .from(DSL.table(DSL.name("texera_db", "site_settings")))
+ .where(DSL.field("key",
classOf[String]).eq("single_file_upload_max_size_mib"))
+ .fetchOneInto(classOf[String])
+ Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong)
+ .getOrElse(defaultMiB) * 1024L * 1024L
+ }
+
/**
* Helper function to get the dataset from DB using did
*/
@@ -672,14 +683,16 @@ class DatasetResource {
@QueryParam("ownerEmail") ownerEmail: String,
@QueryParam("datasetName") datasetName: String,
@QueryParam("filePath") filePath: String,
- @QueryParam("numParts") numParts: Optional[Integer],
+ @QueryParam("fileSizeBytes") fileSizeBytes: Optional[java.lang.Long],
+ @QueryParam("partSizeBytes") partSizeBytes: Optional[java.lang.Long],
@Auth user: SessionUser
): Response = {
val uid = user.getUid
val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)
operationType.toLowerCase match {
- case "init" => initMultipartUpload(dataset.getDid, filePath, numParts,
uid)
+ case "init" =>
+ initMultipartUpload(dataset.getDid, filePath, fileSizeBytes,
partSizeBytes, uid)
case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
case "abort" => abortMultipartUpload(dataset.getDid, filePath, uid)
case _ =>
@@ -740,7 +753,55 @@ class DatasetResource {
if (session == null)
throw new NotFoundException("Upload session not found. Call type=init
first.")
- val expectedParts = session.getNumPartsRequested
+ val expectedParts: Int = session.getNumPartsRequested
+ val fileSizeBytesValue: Long = session.getFileSizeBytes
+ val partSizeBytesValue: Long = session.getPartSizeBytes
+
+ if (fileSizeBytesValue <= 0L) {
+ throw new WebApplicationException(
+ s"Upload session has an invalid file size of $fileSizeBytesValue.
Restart the upload.",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+ if (partSizeBytesValue <= 0L) {
+ throw new WebApplicationException(
+ s"Upload session has an invalid part size of $partSizeBytesValue.
Restart the upload.",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ // lastPartSize = fileSize - partSize*(expectedParts-1)
+ val nMinus1: Long = expectedParts.toLong - 1L
+ if (nMinus1 < 0L) {
+ throw new WebApplicationException(
+ s"Upload session has an invalid number of requested parts of
$expectedParts. Restart the upload.",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+ if (nMinus1 > 0L && partSizeBytesValue > Long.MaxValue / nMinus1) {
+ throw new WebApplicationException(
+ "Overflow while computing last part size",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+ val prefixBytes: Long = partSizeBytesValue * nMinus1
+ if (prefixBytes > fileSizeBytesValue) {
+ throw new WebApplicationException(
+ s"Upload session is invalid: computed bytes before last part
($prefixBytes) exceed declared file size ($fileSizeBytesValue). Restart the
upload.",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+ val lastPartSize: Long = fileSizeBytesValue - prefixBytes
+ if (lastPartSize <= 0L || lastPartSize > partSizeBytesValue) {
+ throw new WebApplicationException(
+ s"Upload session is invalid: computed last part size ($lastPartSize
bytes) must be within 1..$partSizeBytesValue bytes. Restart the upload.",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ val allowedSize: Long =
+ if (partNumber < expectedParts) partSizeBytesValue else lastPartSize
+
if (partNumber > expectedParts) {
throw new BadRequestException(
s"$partNumber exceeds the requested parts on init: $expectedParts"
@@ -754,10 +815,17 @@ class DatasetResource {
)
}
+ if (contentLength != allowedSize) {
+ throw new BadRequestException(
+ s"Invalid part size for partNumber=$partNumber. " +
+ s"Expected Content-Length=$allowedSize, got $contentLength."
+ )
+ }
+
val physicalAddr =
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
- "Upload session is missing physicalAddress. Re-init the upload.",
+ "Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
@@ -768,7 +836,7 @@ class DatasetResource {
catch {
case e: IllegalArgumentException =>
throw new WebApplicationException(
- s"Upload session has invalid physicalAddress. Re-init the
upload. (${e.getMessage})",
+ s"Upload session has invalid physicalAddress. Restart the
upload. (${e.getMessage})",
Response.Status.INTERNAL_SERVER_ERROR
)
}
@@ -800,7 +868,7 @@ class DatasetResource {
if (partRow == null) {
// Should not happen if init pre-created rows
throw new WebApplicationException(
- s"Part row not initialized for part $partNumber. Re-init the
upload.",
+ s"Part row not initialized for part $partNumber. Restart the
upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
@@ -1410,21 +1478,11 @@ class DatasetResource {
dataset
}
- private def validateAndNormalizeFilePathOrThrow(filePath: String): String = {
- val path = Option(filePath).getOrElse("").replace("\\", "/")
- if (
- path.isEmpty ||
- path.startsWith("/") ||
- path.split("/").exists(seg => seg == "." || seg == "..") ||
- path.exists(ch => ch == 0.toChar || ch < 0x20.toChar || ch ==
0x7f.toChar)
- ) throw new BadRequestException("Invalid filePath")
- path
- }
-
private def initMultipartUpload(
did: Integer,
encodedFilePath: String,
- numParts: Optional[Integer],
+ fileSizeBytes: Optional[java.lang.Long],
+ partSizeBytes: Optional[java.lang.Long],
uid: Integer
): Response = {
@@ -1441,12 +1499,63 @@ class DatasetResource {
URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
)
- val numPartsValue = numParts.toScala.getOrElse {
- throw new BadRequestException("numParts is required for
initialization")
+ val fileSizeBytesValue: Long =
+ fileSizeBytes
+ .orElseThrow(() =>
+ new BadRequestException("fileSizeBytes is required for
initialization")
+ )
+
+ if (fileSizeBytesValue <= 0L) {
+ throw new BadRequestException("fileSizeBytes must be > 0")
+ }
+
+ val partSizeBytesValue: Long =
+ partSizeBytes
+ .orElseThrow(() =>
+ new BadRequestException("partSizeBytes is required for
initialization")
+ )
+
+ if (partSizeBytesValue <= 0L) {
+ throw new BadRequestException("partSizeBytes must be > 0")
+ }
+
+ // singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts ==
file size)
+ val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
+ if (totalMaxBytes <= 0L) {
+ throw new WebApplicationException(
+ "singleFileUploadMaxBytes must be > 0",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+ if (fileSizeBytesValue > totalMaxBytes) {
+ throw new BadRequestException(
+ s"fileSizeBytes=$fileSizeBytesValue exceeds
singleFileUploadMaxBytes=$totalMaxBytes"
+ )
+ }
+
+ // Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize -
1) / partSize
+ val addend: Long = partSizeBytesValue - 1L
+ if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) {
+ throw new WebApplicationException(
+ "Overflow while computing numParts",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ val numPartsLong: Long = (fileSizeBytesValue + addend) /
partSizeBytesValue
+ if (numPartsLong < 1L || numPartsLong >
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS.toLong) {
+ throw new BadRequestException(
+ s"Computed numParts=$numPartsLong is out of range
1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
+ )
}
- if (numPartsValue < 1 || numPartsValue >
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) {
+ val numPartsValue: Int = numPartsLong.toInt
+
+ // S3 multipart constraint: all non-final parts must be >= 5MiB.
+ // If we have >1 parts, then partSizeBytesValue is the non-final part
size.
+ if (numPartsValue > 1 && partSizeBytesValue <
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
throw new BadRequestException(
- "numParts must be between 1 and " + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS
+ s"partSizeBytes=$partSizeBytesValue is too small. " +
+ s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART
bytes."
)
}
@@ -1478,7 +1587,6 @@ class DatasetResource {
val uploadIdStr = presign.getUploadId
val physicalAddr = presign.getPhysicalAddress
- // If anything fails after this point, abort LakeFS multipart
try {
val rowsInserted = ctx
.insertInto(DATASET_UPLOAD_SESSION)
@@ -1487,7 +1595,9 @@ class DatasetResource {
.set(DATASET_UPLOAD_SESSION.UID, uid)
.set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
.set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
- .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, numPartsValue)
+ .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED,
Integer.valueOf(numPartsValue))
+ .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES,
java.lang.Long.valueOf(fileSizeBytesValue))
+ .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES,
java.lang.Long.valueOf(partSizeBytesValue))
.onDuplicateKeyIgnore()
.execute()
@@ -1531,7 +1641,6 @@ class DatasetResource {
Response.ok().build()
} catch {
case e: Exception =>
- // rollback will remove session + parts rows; we still must abort
LakeFS
try {
LakeFSStorageClient.abortPresignedMultipartUploads(
repositoryName,
@@ -1597,7 +1706,7 @@ class DatasetResource {
val physicalAddr =
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
- "Upload session is missing physicalAddress. Re-init the upload.",
+ "Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
@@ -1620,7 +1729,7 @@ class DatasetResource {
if (totalCnt != expectedParts) {
throw new WebApplicationException(
- s"Part table mismatch: expected $expectedParts rows but found
$totalCnt. Re-init the upload.",
+ s"Part table mismatch: expected $expectedParts rows but found
$totalCnt. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
@@ -1671,7 +1780,29 @@ class DatasetResource {
physicalAddr
)
- // Cleanup: delete the session; parts are removed by ON DELETE CASCADE
+ // FINAL SERVER-SIDE SIZE CHECK (do not rely on init)
+ val actualSizeBytes =
+ Option(objectStats.getSizeBytes).map(_.longValue()).getOrElse(-1L)
+
+ if (actualSizeBytes <= 0L) {
+ throw new WebApplicationException(
+ "lakeFS did not return sizeBytes for completed multipart upload",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ val maxBytes = singleFileUploadMaxBytes(ctx)
+ val tooLarge = actualSizeBytes > maxBytes
+
+ if (tooLarge) {
+ try {
+
LakeFSStorageClient.resetObjectUploadOrDeletion(dataset.getRepositoryName,
filePath)
+ } catch {
+ case _: Throwable => ()
+ }
+ }
+
+ // always cleanup session
ctx
.deleteFrom(DATASET_UPLOAD_SESSION)
.where(
@@ -1682,6 +1813,13 @@ class DatasetResource {
)
.execute()
+ if (tooLarge) {
+ throw new WebApplicationException(
+ s"Upload exceeded max size: actualSizeBytes=$actualSizeBytes
maxBytes=$maxBytes",
+ Response.Status.REQUEST_ENTITY_TOO_LARGE
+ )
+ }
+
Response
.ok(
Map(
@@ -1741,7 +1879,7 @@ class DatasetResource {
val physicalAddr =
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
- "Upload session is missing physicalAddress. Re-init the upload.",
+ "Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
diff --git
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index 8a6ee34f5f..2902626a70 100644
---
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -22,7 +22,7 @@ package org.apache.texera.service.resource
import ch.qos.logback.classic.{Level, Logger}
import io.lakefs.clients.sdk.ApiException
import jakarta.ws.rs._
-import jakarta.ws.rs.core.{Cookie, HttpHeaders, MediaType, MultivaluedHashMap,
Response}
+import jakarta.ws.rs.core._
import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
import org.apache.texera.auth.SessionUser
import org.apache.texera.dao.MockTexeraDB
@@ -34,10 +34,10 @@ import
org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, DatasetVersio
import org.apache.texera.service.MockLakeFS
import org.jooq.SQLDialect
import org.jooq.impl.DSL
-import org.scalatest.tagobjects.Slow
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import org.scalatest.tagobjects.Slow
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
import org.slf4j.LoggerFactory
import java.io.{ByteArrayInputStream, IOException, InputStream}
@@ -208,6 +208,8 @@ class DatasetResourceSpec
catch {
case e: ApiException if e.getCode == 409 => // ok
}
+ // Ensure max upload size setting does not leak between tests
+ clearMaxUploadMiB()
}
override protected def afterAll(): Unit = {
@@ -410,23 +412,96 @@ class DatasetResourceSpec
override def getDate: Date = null
override def getLength: Int = -1
}
+ private def mkHeadersRawContentLength(raw: String): HttpHeaders =
+ new HttpHeaders {
+ override def getRequestHeader(name: String): java.util.List[String] =
+ if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name))
Collections.singletonList(raw)
+ else Collections.emptyList()
+
+ override def getHeaderString(name: String): String =
+ if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name)) raw else null
+ override def getRequestHeaders: MultivaluedMap[String, String] = {
+ val map = new MultivaluedHashMap[String, String]()
+ map.putSingle(HttpHeaders.CONTENT_LENGTH, raw)
+ map
+ }
+ override def getAcceptableMediaTypes: java.util.List[MediaType] =
Collections.emptyList()
+ override def getAcceptableLanguages: java.util.List[Locale] =
Collections.emptyList()
+ override def getMediaType: MediaType = null
+ override def getLanguage: Locale = null
+ override def getCookies: java.util.Map[String, Cookie] =
Collections.emptyMap()
+ // Not used by the resource (it reads getHeaderString), but keep it safe.
+ override def getLength: Int = -1
+ override def getDate: Date = ???
+ }
private def uniqueFilePath(prefix: String): String =
s"$prefix/${System.nanoTime()}-${Random.alphanumeric.take(8).mkString}.bin"
+ // ---------- site_settings helpers (max upload size) ----------
+ private val MaxUploadKey = "single_file_upload_max_size_mib"
+
+ private def upsertSiteSetting(key: String, value: String): Unit = {
+ val table = DSL.table(DSL.name("texera_db", "site_settings"))
+ val keyField = DSL.field(DSL.name("key"), classOf[String])
+ val valField = DSL.field(DSL.name("value"), classOf[String])
+
+ // Keep it simple + compatible across jOOQ versions: delete then insert.
+ val ctx = getDSLContext
+ ctx.deleteFrom(table).where(keyField.eq(key)).execute()
+ ctx.insertInto(table).columns(keyField, valField).values(key,
value).execute()
+ }
+
+ private def deleteSiteSetting(key: String): Boolean = {
+ val table = DSL.table(DSL.name("texera_db", "site_settings"))
+ val keyField = DSL.field(DSL.name("key"), classOf[String])
+ getDSLContext.deleteFrom(table).where(keyField.eq(key)).execute() > 0
+ }
+
+ private def setMaxUploadMiB(mib: Long): Unit =
upsertSiteSetting(MaxUploadKey, mib.toString)
+ private def clearMaxUploadMiB(): Unit = deleteSiteSetting(MaxUploadKey)
+
+ /**
+ * Convenience helper that adapts legacy "numParts" tests to the new init
API:
+ * init now takes (fileSizeBytes, partSizeBytes) and computes numParts
internally.
+ *
+ * - Non-final parts are exactly partSizeBytes.
+ * - Final part is exactly lastPartBytes.
+ */
private def initUpload(
filePath: String,
numParts: Int,
+ lastPartBytes: Int = 1,
+ partSizeBytes: Int = MinNonFinalPartBytes,
user: SessionUser = multipartOwnerSessionUser
- ): Response =
+ ): Response = {
+ require(numParts >= 1, "numParts must be >= 1")
+ require(lastPartBytes > 0, "lastPartBytes must be > 0")
+ require(partSizeBytes > 0, "partSizeBytes must be > 0")
+ if (numParts > 1)
+ require(
+ lastPartBytes <= partSizeBytes,
+ "lastPartBytes must be <= partSizeBytes for multipart"
+ )
+
+ val fileSizeBytes: Long =
+ if (numParts == 1) lastPartBytes.toLong
+ else partSizeBytes.toLong * (numParts.toLong - 1L) + lastPartBytes.toLong
+
+ // For numParts == 1, allow partSizeBytes >= fileSizeBytes (still computes
1 part).
+ val maxPartSizeBytes: Long =
+ if (numParts == 1) Math.max(partSizeBytes.toLong, fileSizeBytes) else
partSizeBytes.toLong
+
datasetResource.multipartUpload(
"init",
ownerUser.getEmail,
multipartDataset.getName,
urlEnc(filePath),
- Optional.of(numParts),
+ Optional.of(java.lang.Long.valueOf(fileSizeBytes)),
+ Optional.of(java.lang.Long.valueOf(maxPartSizeBytes)),
user
)
+ }
private def finishUpload(
filePath: String,
@@ -438,6 +513,7 @@ class DatasetResourceSpec
multipartDataset.getName,
urlEnc(filePath),
Optional.empty(),
+ Optional.empty(),
user
)
@@ -451,6 +527,7 @@ class DatasetResourceSpec
multipartDataset.getName,
urlEnc(filePath),
Optional.empty(),
+ Optional.empty(),
user
)
@@ -460,11 +537,14 @@ class DatasetResourceSpec
bytes: Array[Byte],
user: SessionUser = multipartOwnerSessionUser,
contentLengthOverride: Option[Long] = None,
- missingContentLength: Boolean = false
+ missingContentLength: Boolean = false,
+ rawContentLengthOverride: Option[String] = None
): Response = {
- val hdrs =
+ val contentLength = contentLengthOverride.getOrElse(bytes.length.toLong)
+ val headers =
if (missingContentLength) mkHeadersMissingContentLength
- else mkHeaders(contentLengthOverride.getOrElse(bytes.length.toLong))
+ else
+
rawContentLengthOverride.map(mkHeadersRawContentLength).getOrElse(mkHeaders(contentLength))
datasetResource.uploadPart(
ownerUser.getEmail,
@@ -472,7 +552,7 @@ class DatasetResourceSpec
urlEnc(filePath),
partNumber,
new ByteArrayInputStream(bytes),
- hdrs,
+ headers,
user
)
}
@@ -482,17 +562,21 @@ class DatasetResourceSpec
partNumber: Int,
stream: InputStream,
contentLength: Long,
- user: SessionUser = multipartOwnerSessionUser
- ): Response =
+ user: SessionUser = multipartOwnerSessionUser,
+ rawContentLengthOverride: Option[String] = None
+ ): Response = {
+ val headers =
+
rawContentLengthOverride.map(mkHeadersRawContentLength).getOrElse(mkHeaders(contentLength))
datasetResource.uploadPart(
ownerUser.getEmail,
multipartDataset.getName,
urlEnc(filePath),
partNumber,
stream,
- mkHeaders(contentLength),
+ headers,
user
)
+ }
private def fetchSession(filePath: String) =
getDSLContext
@@ -551,42 +635,180 @@ class DatasetResourceSpec
assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 3)
}
- it should "reject missing numParts" in {
- val filePath = uniqueFilePath("init-missing-numparts")
- val ex = intercept[BadRequestException] {
+ it should "reject missing fileSizeBytes / partSizeBytes" in {
+ val filePath1 = uniqueFilePath("init-missing-filesize")
+ val ex1 = intercept[BadRequestException] {
datasetResource.multipartUpload(
"init",
ownerUser.getEmail,
multipartDataset.getName,
- urlEnc(filePath),
+ urlEnc(filePath1),
Optional.empty(),
+ Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
multipartOwnerSessionUser
)
}
- assertStatus(ex, 400)
+ assertStatus(ex1, 400)
+
+ val filePath2 = uniqueFilePath("init-missing-partsize")
+ val ex2 = intercept[BadRequestException] {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath2),
+ Optional.of(java.lang.Long.valueOf(1L)),
+ Optional.empty(),
+ multipartOwnerSessionUser
+ )
+ }
+ assertStatus(ex2, 400)
}
- it should "reject invalid numParts (0, negative, too large)" in {
- val filePath = uniqueFilePath("init-bad-numparts")
- assertStatus(intercept[BadRequestException] { initUpload(filePath, 0) },
400)
- assertStatus(intercept[BadRequestException] { initUpload(filePath, -1) },
400)
- assertStatus(intercept[BadRequestException] { initUpload(filePath,
1000000000) }, 400)
+ it should "reject invalid fileSizeBytes / partSizeBytes (<= 0)" in {
+ val filePath = uniqueFilePath("init-bad-sizes")
+
+ assertStatus(
+ intercept[BadRequestException] {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath),
+ Optional.of(java.lang.Long.valueOf(0L)),
+ Optional.of(java.lang.Long.valueOf(1L)),
+ multipartOwnerSessionUser
+ )
+ },
+ 400
+ )
+
+ assertStatus(
+ intercept[BadRequestException] {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath),
+ Optional.of(java.lang.Long.valueOf(1L)),
+ Optional.of(java.lang.Long.valueOf(0L)),
+ multipartOwnerSessionUser
+ )
+ },
+ 400
+ )
}
- it should "reject invalid filePath (empty, absolute, '.', '..', control
chars)" in {
- assertStatus(intercept[BadRequestException] { initUpload("./nope.bin", 2)
}, 400)
- assertStatus(intercept[BadRequestException] { initUpload("/absolute.bin",
2) }, 400)
- assertStatus(intercept[BadRequestException] { initUpload("a/./b.bin", 2)
}, 400)
+ it should "enforce max upload size at init (>, == boundary)" in {
+ // Use a tiny limit so the test doesn't allocate big buffers.
+ setMaxUploadMiB(1) // 1 MiB
- assertStatus(intercept[BadRequestException] { initUpload("../escape.bin",
2) }, 400)
- assertStatus(intercept[BadRequestException] {
initUpload("a/../escape.bin", 2) }, 400)
+ val oneMiB: Long = 1024L * 1024L
+ val filePathOver = uniqueFilePath("init-max-over")
assertStatus(
intercept[BadRequestException] {
- initUpload(s"a/${0.toChar}b.bin", 2)
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePathOver),
+ Optional.of(java.lang.Long.valueOf(oneMiB + 1L)),
+ Optional.of(java.lang.Long.valueOf(oneMiB + 1L)), // single-part
+ multipartOwnerSessionUser
+ )
},
400
)
+
+ val filePathEq = uniqueFilePath("init-max-eq")
+ val resp =
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePathEq),
+ Optional.of(java.lang.Long.valueOf(oneMiB)),
+ Optional.of(java.lang.Long.valueOf(oneMiB)), // single-part
+ multipartOwnerSessionUser
+ )
+
+ resp.getStatus shouldEqual 200
+ fetchSession(filePathEq) should not be null
+ }
+
+ it should "enforce max upload size for multipart (2-part boundary)" in {
+ setMaxUploadMiB(6) // 6 MiB
+
+ val max6MiB: Long = 6L * 1024L * 1024L
+ val partSize: Long = MinNonFinalPartBytes.toLong // 5 MiB
+
+ val filePathEq = uniqueFilePath("init-max-multipart-eq")
+ val respEq =
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePathEq),
+ Optional.of(java.lang.Long.valueOf(max6MiB)),
+ Optional.of(java.lang.Long.valueOf(partSize)),
+ multipartOwnerSessionUser
+ )
+
+ respEq.getStatus shouldEqual 200
+ fetchSession(filePathEq).getNumPartsRequested shouldEqual 2
+
+ val filePathOver = uniqueFilePath("init-max-multipart-over")
+ assertStatus(
+ intercept[BadRequestException] {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePathOver),
+ Optional.of(java.lang.Long.valueOf(max6MiB + 1L)),
+ Optional.of(java.lang.Long.valueOf(partSize)),
+ multipartOwnerSessionUser
+ )
+ },
+ 400
+ )
+ }
+
+ it should "reject init when fileSizeBytes/partSizeBytes would overflow
numParts computation (malicious huge inputs)" in {
+ // Make max big enough to get past the max-size gate without overflowing
maxBytes itself.
+ val maxMiB: Long = Long.MaxValue / (1024L * 1024L)
+ setMaxUploadMiB(maxMiB)
+ val totalMaxBytes: Long = maxMiB * 1024L * 1024L
+ val filePath = uniqueFilePath("init-overflow-numParts")
+
+ val ex = intercept[WebApplicationException] {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath),
+ Optional.of(java.lang.Long.valueOf(totalMaxBytes)),
+ Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
+ multipartOwnerSessionUser
+ )
+ }
+ assertStatus(ex, 500)
+ }
+
+ it should "reject invalid filePath (empty, absolute, '..', control chars)"
in {
+ // failures (must throw)
+ assertStatus(intercept[BadRequestException] { initUpload("/absolute.bin",
2) }, 400)
+ assertStatus(intercept[BadRequestException] { initUpload("../escape.bin",
2) }, 400)
+ // control chars rejected
+ intercept[IllegalArgumentException] {
+ initUpload(s"a/${0.toChar}b.bin", 2)
+ }
+
+ // now succeed (no intercept, because no throw)
+ assert(initUpload("./nope.bin", 2).getStatus == 200)
+ assert(initUpload("a/./b.bin", 2).getStatus == 200)
+ assert(initUpload("a/../escape.bin", 2).getStatus == 200)
}
it should "reject invalid type parameter" in {
@@ -598,6 +820,7 @@ class DatasetResourceSpec
multipartDataset.getName,
urlEnc(filePath),
Optional.empty(),
+ Optional.empty(),
multipartOwnerSessionUser
)
}
@@ -706,6 +929,76 @@ class DatasetResourceSpec
400
)
}
+ it should "reject non-numeric Content-Length (header poisoning)" in {
+ val filePath = uniqueFilePath("part-cl-nonnumeric")
+ initUpload(filePath, numParts = 1)
+ val ex = intercept[BadRequestException] {
+ uploadPart(
+ filePath,
+ partNumber = 1,
+ bytes = tinyBytes(1.toByte),
+ rawContentLengthOverride = Some("not-a-number")
+ )
+ }
+ assertStatus(ex, 400)
+ }
+ it should "reject Content-Length that overflows Long (header poisoning)" in {
+ val filePath = uniqueFilePath("part-cl-overflow")
+ initUpload(filePath, numParts = 1)
+ val ex = intercept[BadRequestException] {
+ uploadPart(
+ filePath,
+ partNumber = 1,
+ bytes = tinyBytes(1.toByte),
+ rawContentLengthOverride =
Some("999999999999999999999999999999999999999")
+ )
+ }
+ assertStatus(ex, 400)
+ }
+ it should "reject when Content-Length does not equal the expected part size
(attempted size-bypass)" in {
+ val filePath = uniqueFilePath("part-cl-mismatch-expected")
+ initUpload(filePath, numParts = 2)
+ val uploadId = fetchUploadIdOrFail(filePath)
+ val bytes = minPartBytes(1.toByte) // exactly MinNonFinalPartBytes
+ val ex = intercept[BadRequestException] {
+ uploadPart(
+ filePath,
+ partNumber = 1,
+ bytes = bytes,
+ contentLengthOverride = Some(bytes.length.toLong - 1L) // lie by 1 byte
+ )
+ }
+ assertStatus(ex, 400)
+ // Ensure we didn't accidentally persist an ETag for a rejected upload.
+ fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag shouldEqual
""
+ }
+
+ it should "not store more bytes than declared Content-Length (send 2x bytes,
claim x)" in {
+ val filePath = uniqueFilePath("part-body-gt-cl")
+ val declared: Int = 1024
+ initUpload(filePath, numParts = 1, lastPartBytes = declared, partSizeBytes
= declared)
+
+ val first = Array.fill[Byte](declared)(1.toByte)
+ val extra = Array.fill[Byte](declared)(2.toByte)
+ val sent = first ++ extra // 2x bytes sent
+
+ uploadPart(
+ filePath,
+ partNumber = 1,
+ bytes = sent,
+ contentLengthOverride = Some(declared.toLong) // claim only x
+ ).getStatus shouldEqual 200
+
+ finishUpload(filePath).getStatus shouldEqual 200
+ // If anything "accepted" the extra bytes, the committed object would
exceed declared size.
+ val repoName = multipartDataset.getRepositoryName
+ val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, "main",
filePath)
+ Files.size(Paths.get(downloaded.toURI)) shouldEqual declared.toLong
+
+ val expected = sha256OfChunks(Seq(first))
+ val got = sha256OfFile(Paths.get(downloaded.toURI))
+ got.toSeq shouldEqual expected
+ }
it should "reject null/empty filePath param early without depending on error
text" in {
val httpHeaders = mkHeaders(1L)
@@ -876,6 +1169,45 @@ class DatasetResourceSpec
assertStatus(ex, 404)
}
+ it should "not commit an oversized upload if the max upload size is
tightened before finish (server-side rollback)" in {
+ val filePath = uniqueFilePath("finish-max-tightened")
+ val twoMiB: Long = 2L * 1024L * 1024L
+
+ // Allow init + part upload under a higher limit.
+ setMaxUploadMiB(3) // 3 MiB
+ datasetResource
+ .multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath),
+ Optional.of(java.lang.Long.valueOf(twoMiB)),
+ Optional.of(java.lang.Long.valueOf(twoMiB)),
+ multipartOwnerSessionUser
+ )
+ .getStatus shouldEqual 200
+
+ uploadPart(filePath, 1,
Array.fill[Byte](twoMiB.toInt)(7.toByte)).getStatus shouldEqual 200
+
+ // Tighten the limit just before finish.
+ setMaxUploadMiB(1) // 1 MiB
+
+ val ex = intercept[WebApplicationException] {
+ finishUpload(filePath) // this now THROWS 413 (doesn't return Response)
+ }
+ ex.getResponse.getStatus shouldEqual 413
+
+ // Oversized objects must not remain accessible after finish (rollback
happened).
+ val repoName = multipartDataset.getRepositoryName
+ val notFound = intercept[ApiException] {
+ LakeFSStorageClient.getFileFromRepo(repoName, "main", filePath)
+ }
+ notFound.getCode shouldEqual 404
+
+ // Session still available.
+ fetchSession(filePath) should not be null
+ }
+
it should "reject finish when no parts were uploaded (all placeholders
empty) without checking messages" in {
val filePath = uniqueFilePath("finish-no-parts")
initUpload(filePath, numParts = 2)
@@ -1128,7 +1460,7 @@ class DatasetResourceSpec
it should "allow abort + re-init after part 1 succeeded but part 2 drops
mid-flight; then complete successfully" in {
val filePath = uniqueFilePath("reinit-after-part2-drop")
- initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 2, lastPartBytes = 1024 * 1024).getStatus
shouldEqual 200
val uploadId1 = fetchUploadIdOrFail(filePath)
uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
@@ -1147,7 +1479,7 @@ class DatasetResourceSpec
fetchSession(filePath) shouldBe null
fetchPartRows(uploadId1) shouldBe empty
- initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus
shouldEqual 200
uploadPart(filePath, 1, minPartBytes(3.toByte)).getStatus shouldEqual 200
uploadPart(filePath, 2, tinyBytes(4.toByte, n = 123)).getStatus
shouldEqual 200
finishUpload(filePath).getStatus shouldEqual 200
@@ -1162,7 +1494,7 @@ class DatasetResourceSpec
}
def reinitAndFinishHappy(filePath: String): Unit = {
- initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 2, lastPartBytes = 321).getStatus
shouldEqual 200
uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
uploadPart(filePath, 2, tinyBytes(8.toByte, n = 321)).getStatus
shouldEqual 200
finishUpload(filePath).getStatus shouldEqual 200
@@ -1192,7 +1524,7 @@ class DatasetResourceSpec
withClue("scenario (2): part2 mid-flight drop") {
val filePath = uniqueFilePath("reupload-part2-drop")
- initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 2, lastPartBytes = 1024 *
1024).getStatus shouldEqual 200
val uploadId = fetchUploadIdOrFail(filePath)
uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
@@ -1242,7 +1574,7 @@ class DatasetResourceSpec
//
---------------------------------------------------------------------------
it should "upload without corruption (sha256 matches final object)" in {
val filePath = uniqueFilePath("sha256-positive")
- initUpload(filePath, numParts = 3).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus
shouldEqual 200
val part1 = minPartBytes(1.toByte)
val part2 = minPartBytes(2.toByte)
@@ -1266,7 +1598,7 @@ class DatasetResourceSpec
it should "detect corruption (sha256 mismatch when a part is altered)" in {
val filePath = uniqueFilePath("sha256-negative")
- initUpload(filePath, numParts = 3).getStatus shouldEqual 200
+ initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus
shouldEqual 200
val part1 = minPartBytes(1.toByte)
val part2 = minPartBytes(2.toByte)
@@ -1306,7 +1638,7 @@ class DatasetResourceSpec
val filePath = uniqueFilePath(s"stress-$i")
val numParts = 2 + Random.nextInt(maxParts - 1)
- initUpload(filePath, numParts).getStatus shouldEqual 200
+ initUpload(filePath, numParts, lastPartBytes = 1024).getStatus
shouldEqual 200
val sharedMin = minPartBytes((i % 127).toByte)
val partFuts = (1 to numParts).map { partN =>
diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
index 64c6cb0b36..16c580be27 100644
--- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
+++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
@@ -231,7 +231,8 @@ export class DatasetService {
.set("ownerEmail", ownerEmail)
.set("datasetName", datasetName)
.set("filePath", encodeURIComponent(filePath))
- .set("numParts", partCount.toString());
+ .set("fileSizeBytes", file.size.toString())
+ .set("partSizeBytes", partSize.toString());
const init$ = this.http.post<{}>(
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index 07206afc30..f4728279e3 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -281,17 +281,31 @@ CREATE TABLE IF NOT EXISTS dataset_version
CREATE TABLE IF NOT EXISTS dataset_upload_session
(
- did INT NOT NULL,
- uid INT NOT NULL,
- file_path TEXT NOT NULL,
- upload_id VARCHAR(256) NOT NULL UNIQUE,
- physical_address TEXT,
- num_parts_requested INT NOT NULL,
+ did INT NOT NULL,
+ uid INT NOT NULL,
+ file_path TEXT NOT NULL,
+ upload_id VARCHAR(256) NOT NULL UNIQUE,
+ physical_address TEXT,
+ num_parts_requested INT NOT NULL,
+ file_size_bytes BIGINT NOT NULL,
+ part_size_bytes BIGINT NOT NULL,
PRIMARY KEY (uid, did, file_path),
FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE,
- FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE
+ FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE,
+
+ CONSTRAINT chk_dataset_upload_session_num_parts_requested_positive
+ CHECK (num_parts_requested >= 1),
+
+ CONSTRAINT chk_dataset_upload_session_file_size_bytes_positive
+ CHECK (file_size_bytes > 0),
+
+ CONSTRAINT chk_dataset_upload_session_part_size_bytes_positive
+ CHECK (part_size_bytes > 0),
+
+ CONSTRAINT chk_dataset_upload_session_part_size_bytes_s3_upper_bound
+ CHECK (part_size_bytes <= 5368709120)
);
CREATE TABLE IF NOT EXISTS dataset_upload_session_part
diff --git a/sql/updates/19.sql b/sql/updates/19.sql
new file mode 100644
index 0000000000..92d2f998c7
--- /dev/null
+++ b/sql/updates/19.sql
@@ -0,0 +1,60 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+SET search_path TO texera_db, public;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+BEGIN;
+
+-- Add the 2 columns (defaults backfill existing rows safely)
+ALTER TABLE dataset_upload_session
+ ADD COLUMN IF NOT EXISTS file_size_bytes BIGINT NOT NULL DEFAULT 1,
+ ADD COLUMN IF NOT EXISTS part_size_bytes BIGINT NOT NULL DEFAULT 5242880;
+
+-- Drop any old/alternate constraint names from previous attempts (so we end
up with exactly the new names)
+ALTER TABLE dataset_upload_session
+ DROP CONSTRAINT IF EXISTS
dataset_upload_session_num_parts_requested_positive,
+ DROP CONSTRAINT IF EXISTS
chk_dataset_upload_session_num_parts_requested_positive,
+ DROP CONSTRAINT IF EXISTS
chk_dataset_upload_session_file_size_bytes_positive,
+ DROP CONSTRAINT IF EXISTS
chk_dataset_upload_session_part_size_bytes_positive,
+ DROP CONSTRAINT IF EXISTS dataset_upload_session_part_size_bytes_positive,
+ DROP CONSTRAINT IF EXISTS
dataset_upload_session_part_size_bytes_s3_upper_bound,
+ DROP CONSTRAINT IF EXISTS
chk_dataset_upload_session_part_size_bytes_s3_upper_bound;
+
+-- Add constraints exactly like the new CREATE TABLE
+ALTER TABLE dataset_upload_session
+ ADD CONSTRAINT chk_dataset_upload_session_num_parts_requested_positive
+ CHECK (num_parts_requested >= 1),
+ ADD CONSTRAINT chk_dataset_upload_session_file_size_bytes_positive
+ CHECK (file_size_bytes > 0),
+ ADD CONSTRAINT chk_dataset_upload_session_part_size_bytes_positive
+ CHECK (part_size_bytes > 0),
+ ADD CONSTRAINT chk_dataset_upload_session_part_size_bytes_s3_upper_bound
+ CHECK (part_size_bytes <= 5368709120);
+
+-- Match CREATE TABLE (no defaults)
+ALTER TABLE dataset_upload_session
+ ALTER COLUMN file_size_bytes DROP DEFAULT,
+ ALTER COLUMN part_size_bytes DROP DEFAULT;
+
+COMMIT;