Modified: releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp (264080 => 264081)
--- releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp 2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp 2020-07-08 10:07:10 UTC (rev 264081)
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.dro...@collabora.co.uk>
* Copyright (C) 2013 Collabora Ltd.
- * Copyright (C) 2019 Igalia S.L.
+ * Copyright (C) 2019, 2020 Igalia S.L.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -25,7 +25,6 @@
#include "GStreamerCommon.h"
#include "HTTPHeaderNames.h"
-#include "MainThreadNotifier.h"
#include "MediaPlayer.h"
#include "PlatformMediaResourceLoader.h"
#include "PolicyChecker.h"
@@ -34,11 +33,14 @@
#include "ResourceResponse.h"
#include <cstdint>
#include <wtf/Condition.h>
+#include <wtf/DataMutex.h>
+#include <wtf/RunLoop.h>
#include <wtf/Scope.h>
#include <wtf/glib/WTFGType.h>
#include <wtf/text/CString.h>
using namespace WebCore;
+using WTF::DataMutex;
// Never pause download of media resources smaller than 2MiB.
#define SMALL_MEDIA_RESOURCE_MAX_SIZE 2 * 1024 * 1024
@@ -55,7 +57,7 @@
WTF_MAKE_FAST_ALLOCATED;
WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
public:
- CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&);
+ CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&, unsigned requestNumber);
virtual ~CachedResourceStreamingClient();
const HashSet<RefPtr<WebCore::SecurityOrigin>>& securityOrigins() const { return m_origins; }
@@ -80,6 +82,7 @@
static constexpr float s_reduceBlocksizeFactor { 0.5 };
int m_reduceBlocksizeCount { 0 };
int m_increaseBlocksizeCount { 0 };
+ unsigned m_requestNumber;
GRefPtr<GstElement> m_src;
ResourceRequest m_request;
@@ -86,67 +89,72 @@
HashSet<RefPtr<WebCore::SecurityOrigin>> m_origins;
};
-enum MainThreadSourceNotification {
- Start = 1 << 0,
- Stop = 1 << 1,
- Dispose = 1 << 2,
-};
+struct WebKitWebSrcPrivate {
+ // Constants initialized during construction:
+ unsigned minimumBlocksize;
-struct _WebKitWebSrcPrivate {
- ~_WebKitWebSrcPrivate()
- {
- if (notifier && notifier->isValid()) {
- notifier->notifyAndWait(MainThreadSourceNotification::Dispose, [&] {
- if (resource) {
- auto* client = static_cast<CachedResourceStreamingClient*>(resource->client());
- if (client)
- client->setSourceElement(nullptr);
-
- resource->setClient(nullptr);
- }
- loader = nullptr;
- });
- notifier->invalidate();
- notifier = nullptr;
- }
- }
-
+ // Configuration of the element (properties set by the user of WebKitWebSrc):
+ // They can only change when state < PAUSED.
CString originalURI;
- CString redirectedURI;
bool keepAlive;
GUniquePtr<GstStructure> extraHeaders;
bool compress;
GUniquePtr<gchar> httpMethod;
- WebCore::MediaPlayer* player;
- RefPtr<PlatformMediaResourceLoader> loader;
- RefPtr<PlatformMediaResource> resource;
- RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
- bool didPassAccessControlCheck;
- bool wereHeadersReceived;
- Condition headersCondition;
- Lock responseLock;
- bool wasResponseReceived;
- Condition responseCondition;
- bool doesHaveEOS;
- bool isFlushing { false };
- uint64_t readPosition;
- uint64_t requestedPosition;
- uint64_t stopPosition;
- bool isDurationSet;
- bool haveSize;
- uint64_t size;
- bool isSeekable;
- bool isSeeking;
- bool wasSeeking { false };
- unsigned minimumBlocksize;
- Lock adapterLock;
- Condition adapterCondition;
- bool isDownloadSuspended { false };
- GRefPtr<GstAdapter> adapter;
- GRefPtr<GstEvent> httpHeadersEvent;
- GUniquePtr<GstStructure> httpHeaders;
- WallTime downloadStartTime { WallTime::nan() };
- uint64_t totalDownloadedBytes { 0 };
+
+ struct StreamingMembers {
+ ~StreamingMembers()
+ {
+ // By the time we're destroying WebKitWebSrcPrivate unLock() should have been called and therefore resource
+ // should have already been cleared.
+ ASSERT(!resource);
+ // ResourceLoader is not thread-safe. It's not even ThreadSafeRefCounted. Therefore, to be safe, we want the
+ // unref to happen in the main thread.
+ if (loader)
+ RunLoop::main().dispatch([loader = WTFMove(loader)] { });
+ }
+
+ // Properties initially empty, but set once the first HTTP response arrives:
+ bool wasResponseReceived;
+ CString redirectedURI;
+ bool didPassAccessControlCheck;
+ bool haveSize;
+ uint64_t size;
+ bool isSeekable;
+ GRefPtr<GstCaps> pendingCaps;
+ GRefPtr<GstMessage> pendingHttpHeadersMessage; // Set from MT, sent from create().
+ GRefPtr<GstEvent> pendingHttpHeadersEvent; // Set from MT, sent from create().
+
+ // Properties updated with every downloaded data block:
+ WallTime downloadStartTime { WallTime::nan() };
+ uint64_t totalDownloadedBytes { 0 };
+ bool doesHaveEOS; // Set both when we reach stopPosition and on errors (including on responseReceived).
+ bool isDownloadSuspended { false }; // Set to true from the network handler when the high water level is reached.
+
+ // Obtained by means of GstContext queries before making the first HTTP request.
+ // We use it for getting access to WebKit networking objects: the PlatformMediaResourceLoader factory [createResourceLoader()]
+ // and the player HTTP referrer string.
+ WebCore::MediaPlayer* player;
+
+ // Properties used for GStreamer data-flow in create().
+ bool isFlushing { false };
+ Condition responseCondition; // Must be signaled after any updates on HTTP requests, and when flushing.
+ GRefPtr<GstAdapter> adapter;
+ bool isDurationSet;
+ uint64_t readPosition;
+
+ // Properties only set during seek.
+ // basesrc ensures they can't change during a create() call by taking the STREAMING_LOCK.
+ // (An initial seek is also guaranteed by basesrc.)
+ unsigned requestNumber { 1 };
+ uint64_t requestedPosition { 0 };
+ uint64_t stopPosition { UINT64_MAX };
+
+ bool isRequestPending { true };
+
+ RefPtr<PlatformMediaResourceLoader> loader;
+ RefPtr<PlatformMediaResource> resource;
+ };
+ DataMutex<StreamingMembers> dataMutex;
};
enum {
@@ -169,9 +177,8 @@
static void webKitWebSrcConstructed(GObject*);
static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
-static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
static GstFlowReturn webKitWebSrcCreate(GstPushSrc*, GstBuffer**);
-static gboolean webKitWebSrcMakeRequest(GstBaseSrc*, bool);
+static void webKitWebSrcMakeRequest(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
static gboolean webKitWebSrcStart(GstBaseSrc*);
static gboolean webKitWebSrcStop(GstBaseSrc*);
static gboolean webKitWebSrcGetSize(GstBaseSrc*, guint64* size);
@@ -181,8 +188,8 @@
static gboolean webKitWebSrcUnLock(GstBaseSrc*);
static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
static void webKitWebSrcSetContext(GstElement*, GstContext*);
-static void restartLoaderIfNeeded(WebKitWebSrc*);
-static void stopLoaderIfNeeded(WebKitWebSrc*);
+static void restartLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
+static void stopLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
#define webkit_web_src_parent_class parent_class
WEBKIT_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_PUSH_SRC,
@@ -230,7 +237,6 @@
g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
- eklass->change_state = GST_DEBUG_FUNCPTR(webKitWebSrcChangeState);
eklass->set_context = GST_DEBUG_FUNCPTR(webKitWebSrcSetContext);
GstBaseSrcClass* baseSrcClass = GST_BASE_SRC_CLASS(klass);
@@ -247,18 +253,40 @@
pushSrcClass->create = GST_DEBUG_FUNCPTR(webKitWebSrcCreate);
}
-static void webkitWebSrcReset(WebKitWebSrc* src)
+enum class ResetType {
+ Soft,
+ Hard
+};
+
+static void webkitWebSrcReset(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members, ResetType resetType)
{
- WebKitWebSrcPrivate* priv = src->priv;
+ GST_DEBUG_OBJECT(src, "Resetting internal state");
+ gst_adapter_clear(members->adapter.get());
+ members->isRequestPending = true;
- GST_DEBUG_OBJECT(src, "Resetting internal state");
- priv->haveSize = false;
- priv->wereHeadersReceived = false;
- priv->isSeekable = false;
- priv->readPosition = 0;
- priv->requestedPosition = 0;
- priv->stopPosition = -1;
- priv->size = 0;
+ // Reset request state. Any previous request has been cancelled at this point.
+ members->wasResponseReceived = false;
+ members->doesHaveEOS = false;
+ members->downloadStartTime = WallTime::nan();
+ members->totalDownloadedBytes = 0; // Resetted for each request, used to estimate download speed.
+ members->pendingHttpHeadersMessage = nullptr;
+ members->pendingHttpHeadersEvent = nullptr;
+
+ // After a flush, we have to emit a segment again.
+ members->isDurationSet = false;
+
+ // Hard reset is done during initialization and state transitions.
+ // Soft reset is done during flushes. In these, we preserve the seek target.
+ if (resetType == ResetType::Hard) {
+ members->didPassAccessControlCheck = false;
+ members->redirectedURI = CString();
+ members->isSeekable = false;
+ members->haveSize = false;
+ members->size = 0;
+ members->requestedPosition = 0;
+ members->stopPosition = UINT64_MAX;
+ members->readPosition = members->requestedPosition;
+ }
}
static void webKitWebSrcConstructed(GObject* object)
@@ -268,13 +296,13 @@
WebKitWebSrc* src = ""
WebKitWebSrcPrivate* priv = src->priv;
- priv->notifier = MainThreadNotifier<MainThreadSourceNotification>::create();
- priv->adapter = adoptGRef(gst_adapter_new());
priv->minimumBlocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(src));
- webkitWebSrcReset(src);
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ members->adapter = adoptGRef(gst_adapter_new());
+ webkitWebSrcReset(src, members, ResetType::Hard);
+
gst_base_src_set_automatic_eos(GST_BASE_SRC_CAST(src), FALSE);
- gst_base_src_set_async(GST_BASE_SRC_CAST(src), TRUE);
}
static void webKitWebSrcSetProperty(GObject* object, guint propID, const GValue* value, GParamSpec* pspec)
@@ -314,9 +342,11 @@
case PROP_LOCATION:
g_value_set_string(value, priv->originalURI.data());
break;
- case PROP_RESOLVED_LOCATION:
- g_value_set_string(value, priv->redirectedURI.isNull() ? priv->originalURI.data() : priv->redirectedURI.data());
+ case PROP_RESOLVED_LOCATION: {
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ g_value_set_string(value, members->redirectedURI.isNull() ? priv->originalURI.data() : members->redirectedURI.data());
break;
+ }
case PROP_KEEP_ALIVE:
g_value_set_boolean(value, priv->keepAlive);
break;
@@ -343,202 +373,198 @@
GST_DEBUG_OBJECT(src, "context type: %s", gst_context_get_context_type(context));
if (gst_context_has_context_type(context, WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME)) {
const GValue* value = gst_structure_get_value(gst_context_get_structure(context), "player");
- priv->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ members->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
}
GST_ELEMENT_CLASS(parent_class)->set_context(element, context);
}
-static void restartLoaderIfNeeded(WebKitWebSrc* src)
+static void restartLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
{
- WebKitWebSrcPrivate* priv = src->priv;
-
- if (!priv->isDownloadSuspended) {
+ if (!members->isDownloadSuspended) {
GST_TRACE_OBJECT(src, "download already active");
return;
}
GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
- " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
- , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
- if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+ " (min %u)", boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->doesHaveEOS), boolForPrinting(members->haveSize)
+ , boolForPrinting(members->isSeekable), members->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
+ if (members->doesHaveEOS || !members->haveSize || !members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
return;
}
- GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
- if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
+ GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", members->readPosition, gst_element_state_get_name(GST_STATE(src)));
+ if (!members->readPosition || members->readPosition == members->size || GST_STATE(src) < GST_STATE_PAUSED) {
GST_TRACE_OBJECT(src, "can't restart download");
return;
}
- size_t queueSize = gst_adapter_available(priv->adapter.get());
- GST_TRACE_OBJECT(src, "queue size %zd (min %1.0f)", queueSize
- , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
+ size_t queueSize = gst_adapter_available(members->adapter.get());
+ GST_TRACE_OBJECT(src, "queue size %zu (min %1.0f)", queueSize
+ , members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
- if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
+ if (queueSize >= members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
return;
}
GST_DEBUG_OBJECT(src, "restarting download");
- priv->isDownloadSuspended = false;
- webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
+ members->isDownloadSuspended = false;
+ members->requestNumber++;
+ members->requestedPosition = members->readPosition;
+ webKitWebSrcMakeRequest(src, members);
}
-static void stopLoaderIfNeeded(WebKitWebSrc* src)
+static void stopLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
{
- WebKitWebSrcPrivate* priv = src->priv;
+ ASSERT(isMainThread());
- if (priv->isDownloadSuspended) {
+ if (members->isDownloadSuspended) {
GST_TRACE_OBJECT(src, "download already suspended");
return;
}
GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
- , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
+ , boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->haveSize), boolForPrinting(members->isSeekable), members->size
, SMALL_MEDIA_RESOURCE_MAX_SIZE);
- if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+ if (!members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
return;
}
- size_t queueSize = gst_adapter_available(priv->adapter.get());
- GST_TRACE_OBJECT(src, "queue size %zd (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
- if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
+ size_t queueSize = gst_adapter_available(members->adapter.get());
+ GST_TRACE_OBJECT(src, "queue size %zu (max %1.0f)", queueSize, members->size * HIGH_QUEUE_FACTOR_THRESHOLD);
+ if (queueSize <= members->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
return;
}
- GST_DEBUG_OBJECT(src, "stopping download");
- priv->isDownloadSuspended = true;
- priv->resource->stop();
+ if (members->readPosition == members->size) {
+ GST_TRACE_OBJECT(src, "just downloaded the last chunk in the file, loadFinished() is about to be called");
+ return;
+ }
+
+ GST_DEBUG_OBJECT(src, "R%u: stopping download", members->requestNumber);
+ members->isDownloadSuspended = true;
+ members->resource->stop();
}
static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
{
+ ASSERT(!isMainThread());
GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
WebKitWebSrc* src = ""
WebKitWebSrcPrivate* priv = src->priv;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
- GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, priv->readPosition, priv->requestedPosition);
+ // We need members->player to make requests. There are two mechanisms for this.
+ //
+ // 1) webKitWebSrcSetMediaPlayer() is called by MediaPlayerPrivateGStreamer by means of hooking playbin's
+ // "source-setup" event. This doesn't work for additional WebKitWebSrc elements created by adaptivedemux.
+ //
+ // 2) A GstContext query made here. Because of a bug, this only works in GStreamer >= 1.12.
+ //
+ // As a compatibility workaround, the http: URI protocol is only registered for gst>=1.12; otherwise using
+ // webkit+http:, which is used by MediaPlayerPrivateGStreamer but not by adaptivedemux's additional source
+ // elements, therefore using souphttpsrc instead and not routing traffic through the NetworkProcess.
+ if (webkitGstCheckVersion(1, 12, 0) && !members->player) {
+ members.runUnlocked([src, baseSrc]() {
+ GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
+ if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
+ GstContext* context;
- if (priv->requestedPosition != priv->readPosition) {
- {
- LockHolder adapterLocker(priv->adapterLock);
- GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
- gst_adapter_clear(priv->adapter.get());
- }
- uint64_t requestedPosition = priv->requestedPosition;
- webKitWebSrcStop(baseSrc);
- priv->requestedPosition = requestedPosition;
- // Do not notify async-completion, in seeking flows, we will
- // be called from GstBaseSrc's perform_seek vfunc, which holds
- // a streaming lock in our frame. Hence, we would deadlock
- // trying to notify async completion, since that also requires
- // the streaming lock.
- webKitWebSrcMakeRequest(baseSrc, false);
- }
-
- {
- LockHolder locker(priv->responseLock);
- priv->responseCondition.wait(priv->responseLock, [priv] () {
- return priv->wasResponseReceived || priv->isFlushing;
+ gst_query_parse_context(query.get(), &context);
+ gst_element_set_context(GST_ELEMENT_CAST(src), context);
+ } else
+ gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
});
+ if (members->isFlushing)
+ return GST_FLOW_FLUSHING;
}
+ RELEASE_ASSERT(members->player);
- // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
- // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
- // other buffers can be retrieved with the "non _fast" API.
- GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
- , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
+ GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, members->readPosition, members->requestedPosition);
- if (priv->doesHaveEOS) {
- GST_DEBUG_OBJECT(src, "EOS");
- return GST_FLOW_EOS;
+ if (members->isRequestPending) {
+ members->isRequestPending = false;
+ webKitWebSrcMakeRequest(src, members);
}
- unsigned size = gst_base_src_get_blocksize(baseSrc);
- size_t queueSize;
- {
- LockHolder adapterLocker(priv->adapterLock);
- unsigned retries = 0;
- queueSize = gst_adapter_available(priv->adapter.get());
- GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
- while (!queueSize && !priv->isFlushing) {
- GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
- priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
- restartLoaderIfNeeded(src);
- return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
- });
- queueSize = gst_adapter_available(priv->adapter.get());
- GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
- if (queueSize || priv->isFlushing) {
- // We have data or we're flushing. We can break the loop here.
- break;
- }
+ // Wait for the response headers.
+ members->responseCondition.wait(members.mutex(), [&] () {
+ return members->wasResponseReceived || members->isFlushing;
+ });
- // We should keep waiting but we could be in EOS. Let's check the two possibilities:
- // 1. We are at the end of the file with a known size.
- // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
- if (priv->haveSize && priv->readPosition >= priv->size) {
- GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
- return GST_FLOW_EOS;
- }
- GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
- if (!priv->isDownloadSuspended && ++retries >= 10) {
- GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
- return GST_FLOW_EOS;
- }
- }
- }
+ if (members->isFlushing)
+ return GST_FLOW_FLUSHING;
- if (priv->isFlushing) {
- GST_DEBUG_OBJECT(src, "Flushing");
- return GST_FLOW_FLUSHING;
+ if (members->pendingCaps) {
+ GST_DEBUG_OBJECT(src, "Setting caps: %" GST_PTR_FORMAT, members->pendingCaps.get());
+ members.runUnlocked([baseSrc, caps = members->pendingCaps.leakRef()]() {
+ gst_base_src_set_caps(baseSrc, caps);
+ });
+ if (members->isFlushing)
+ return GST_FLOW_FLUSHING;
}
- if (priv->haveSize && !priv->isDurationSet) {
- GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, priv->size);
- baseSrc->segment.duration = priv->size;
- priv->isDurationSet = true;
+ if (members->haveSize && !members->isDurationSet) {
+ GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, members->size);
+ baseSrc->segment.duration = members->size;
+ members->isDurationSet = true;
gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
}
- if (priv->httpHeadersEvent)
- gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
+ if (members->pendingHttpHeadersMessage)
+ gst_element_post_message(GST_ELEMENT(src), members->pendingHttpHeadersMessage.leakRef());
+ if (members->pendingHttpHeadersEvent)
+ gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), members->pendingHttpHeadersEvent.leakRef());
- {
- LockHolder adapterLocker(priv->adapterLock);
- queueSize = gst_adapter_available(priv->adapter.get());
+ restartLoaderIfNeeded(src, members);
+
+ // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
+ // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
+ // other buffers can be retrieved with the "non _fast" API.
+ GST_TRACE_OBJECT(src, "doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(members->doesHaveEOS), boolForPrinting(members->isDownloadSuspended));
+
+ unsigned size = gst_base_src_get_blocksize(baseSrc);
+ size_t queueSize = gst_adapter_available(members->adapter.get());
+ GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
+ if (!queueSize) {
+ GST_TRACE_OBJECT(src, "let's wait for data or EOS");
+ members->responseCondition.wait(members.mutex(), [&] {
+ return members->isFlushing || gst_adapter_available(members->adapter.get()) || members->doesHaveEOS;
+ });
+ if (members->isFlushing)
+ return GST_FLOW_FLUSHING;
+
+ queueSize = gst_adapter_available(members->adapter.get());
+ GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
+ }
+
+ if (queueSize) {
if (queueSize < size) {
GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
size = queueSize;
} else
GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
- if (size) {
- *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
- RELEASE_ASSERT(*buffer);
- GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
- GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
- GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
- GST_TRACE_OBJECT(src, "doesHaveEOS: %s, wasSeeking: %s, seeking: %s, buffer size: %u, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->wasSeeking), boolForPrinting(priv->isSeeking), size, priv->size);
- if (priv->haveSize && GST_BUFFER_OFFSET_END(*buffer) >= priv->size) {
- if (priv->wasSeeking)
- priv->wasSeeking = false;
- else if (priv->isSeekable)
- priv->doesHaveEOS = true;
- } else if (priv->wasSeeking)
- priv->wasSeeking = false;
+ *buffer = gst_adapter_take_buffer(members->adapter.get(), size);
+ RELEASE_ASSERT(*buffer);
- restartLoaderIfNeeded(src);
- } else {
- GST_ERROR_OBJECT(src, "Empty adapter!");
- ASSERT_NOT_REACHED();
- }
+ GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
+ GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
+ GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
+ GST_TRACE_OBJECT(src, "buffer size: %u, total content size: %" G_GUINT64_FORMAT, size, members->size);
+
+ restartLoaderIfNeeded(src, members);
+ return GST_FLOW_OK;
}
- return GST_FLOW_OK;
+ // If the queue is empty reached this point, the only other option is that we are in EOS.
+ ASSERT(members->doesHaveEOS);
+ GST_DEBUG_OBJECT(src, "Reached the end of the response, signalling EOS");
+ return GST_FLOW_EOS;
}
static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
@@ -594,52 +620,23 @@
static gboolean webKitWebSrcStart(GstBaseSrc* baseSrc)
{
- // This method should only be called by BaseSrc, do not call it
- // from ourselves unless you ensure the streaming lock is not
- // held. If it is, you will deadlock the WebProcess.
- return webKitWebSrcMakeRequest(baseSrc, true);
-}
-
-static gboolean webKitWebSrcMakeRequest(GstBaseSrc* baseSrc, bool notifyAsyncCompletion)
-{
WebKitWebSrc* src = ""
- WebKitWebSrcPrivate* priv = src->priv;
- if (webkitGstCheckVersion(1, 12, 0) && !priv->player) {
- GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
- if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
- GstContext* context;
-
- gst_query_parse_context(query.get(), &context);
- gst_element_set_context(GST_ELEMENT_CAST(src), context);
- } else
- gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
- }
-
- RELEASE_ASSERT(priv->player);
-
- priv->wereHeadersReceived = false;
- priv->wasResponseReceived = false;
- priv->isDurationSet = false;
- priv->doesHaveEOS = false;
- priv->isFlushing = false;
- priv->downloadStartTime = WallTime::nan();
-
- priv->didPassAccessControlCheck = false;
-
- if (priv->originalURI.isNull()) {
+ if (src->priv->originalURI.isNull()) {
GST_ERROR_OBJECT(src, "No URI provided");
- webKitWebSrcStop(baseSrc);
return FALSE;
}
+ return TRUE;
+}
- if (priv->requestedPosition == priv->stopPosition) {
- GST_DEBUG_OBJECT(src, "Empty segment, signaling EOS");
- priv->doesHaveEOS = true;
- return FALSE;
- }
+static void webKitWebSrcMakeRequest(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
+{
+ WebKitWebSrcPrivate* priv = src->priv;
- GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
+ ASSERT(!priv->originalURI.isNull());
+ ASSERT(members->requestedPosition != members->stopPosition);
+
+ GST_DEBUG_OBJECT(src, "Posting task to request R%u %s requestedPosition=%" G_GUINT64_FORMAT " stopPosition=%" G_GUINT64_FORMAT, members->requestNumber, priv->originalURI.data(), members->requestedPosition, members->stopPosition);
URL url = "" priv->originalURI.data());
ResourceRequest request(url);
@@ -646,7 +643,7 @@
request.setAllowCookies(true);
request.setFirstPartyForCookies(url);
- request.setHTTPReferrer(priv->player->referrer());
+ request.setHTTPReferrer(members->player->referrer());
if (priv->httpMethod.get())
request.setHTTPMethod(priv->httpMethod.get());
@@ -670,12 +667,12 @@
|| !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
request.setHTTPUserAgent("Quicktime/7.6.6");
- if (priv->requestedPosition) {
- GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedPosition));
+ if (members->requestedPosition) {
+ GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", members->requestedPosition));
GST_DEBUG_OBJECT(src, "Range request: %s", formatedRange.get());
request.setHTTPHeaderField(HTTPHeaderName::Range, formatedRange.get());
}
- priv->readPosition = priv->requestedPosition;
+ ASSERT(members->readPosition == members->requestedPosition);
GST_DEBUG_OBJECT(src, "Persistent connection support %s", priv->keepAlive ? "enabled" : "disabled");
if (!priv->keepAlive)
@@ -687,70 +684,41 @@
// We always request Icecast/Shoutcast metadata, just in case ...
request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
- GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
- priv->notifier->notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request), src, notifyAsyncCompletion] {
+ ASSERT(!isMainThread());
+ RunLoop::main().dispatch([protector = WTF::ensureGRef(src), request = WTFMove(request), requestNumber = members->requestNumber, src] {
WebKitWebSrcPrivate* priv = protector->priv;
- if (!priv->loader)
- priv->loader = priv->player->createResourceLoader();
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ // Ignore this task (not making any HTTP request) if by now WebKitWebSrc streaming thread is already waiting
+ // for a different request. There is no point anymore in sending this one.
+ if (members->requestNumber != requestNumber) {
+ GST_DEBUG_OBJECT(protector.get(), "Skipping R%u, current request number is %u", requestNumber, members->requestNumber);
+ return;
+ }
+ if (!members->loader)
+ members->loader = members->player->createResourceLoader();
+
PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
if (request.url().protocolIsBlob())
loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
- priv->resource = priv->loader->requestResource(ResourceRequest(request), loadOptions);
- if (priv->resource) {
- priv->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request)));
- GST_DEBUG_OBJECT(protector.get(), "Started request");
- if (notifyAsyncCompletion)
- gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_OK);
+ members->resource = members->loader->requestResource(ResourceRequest(request), loadOptions);
+ if (members->resource) {
+ members->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request), requestNumber));
+ GST_DEBUG_OBJECT(protector.get(), "Started request R%u", requestNumber);
} else {
- GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
- if (notifyAsyncCompletion)
- gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_ERROR);
- priv->loader = nullptr;
+ GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client to handle R%u", requestNumber);
+ members->loader = nullptr;
}
});
-
- return TRUE;
}
-static void webKitWebSrcCloseSession(WebKitWebSrc* src)
-{
- WebKitWebSrcPrivate* priv = src->priv;
- GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-
- priv->notifier->notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
- WebKitWebSrcPrivate* priv = protector->priv;
-
- GST_DEBUG_OBJECT(protector.get(), "Stopping resource loader");
-
- if (priv->resource) {
- priv->resource->stop();
- priv->resource->setClient(nullptr);
- priv->resource = nullptr;
- }
-
- if (!keepAlive)
- priv->loader = nullptr;
- });
-
- GST_DEBUG_OBJECT(src, "Resource loader stopped");
-}
-
static gboolean webKitWebSrcStop(GstBaseSrc* baseSrc)
{
WebKitWebSrc* src = ""
- WebKitWebSrcPrivate* priv = src->priv;
-
- if (priv->resource || (priv->loader && !priv->keepAlive))
- webKitWebSrcCloseSession(src);
-
- {
- LockHolder adapterLocker(priv->adapterLock);
- gst_adapter_clear(priv->adapter.get());
- }
-
- webkitWebSrcReset(src);
- GST_DEBUG_OBJECT(src, "Stopped request");
+ // basesrc will always call unLock() and unLockStop() before calling this. See gst_base_src_stop().
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ webkitWebSrcReset(src, members, ResetType::Hard);
+ GST_DEBUG_OBJECT(src, "Stopped WebKitWebSrc");
return TRUE;
}
@@ -757,11 +725,11 @@
static gboolean webKitWebSrcGetSize(GstBaseSrc* baseSrc, guint64* size)
{
WebKitWebSrc* src = ""
- WebKitWebSrcPrivate* priv = src->priv;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
- GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->haveSize), priv->size);
- if (priv->haveSize) {
- *size = priv->size;
+ GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(members->haveSize), members->size);
+ if (members->haveSize) {
+ *size = members->size;
return TRUE;
}
@@ -771,40 +739,42 @@
static gboolean webKitWebSrcIsSeekable(GstBaseSrc* baseSrc)
{
WebKitWebSrc* src = ""
-
- GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(src->priv->isSeekable));
- return src->priv->isSeekable;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(members->isSeekable));
+ return members->isSeekable;
}
static gboolean webKitWebSrcDoSeek(GstBaseSrc* baseSrc, GstSegment* segment)
{
+ // This function is mutually exclusive with create(). It's only called when we're transitioning to >=PAUSED and
+ // between flushes. In any case, basesrc holds the STREAM_LOCK, so we know create() is not running.
+ // Also, both webKitWebSrcUnLock() and webKitWebSrcUnLockStop() are guaranteed to be called *before* this function.
+ // [See gst_base_src_perform_seek()].
+ ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || GST_PAD_IS_FLUSHING(baseSrc->srcpad));
+
+ // Except for the initial seek, this function is only called if isSeekable() returns true.
+ ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || webKitWebSrcIsSeekable(baseSrc));
+
WebKitWebSrc* src = ""
- WebKitWebSrcPrivate* priv = src->priv;
- LockHolder locker(priv->responseLock);
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
- GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ")", segment->start, segment->stop);
- if (priv->readPosition == segment->start && priv->requestedPosition == priv->readPosition && priv->stopPosition == segment->stop) {
- GST_DEBUG_OBJECT(src, "Seek to current read/end position and no seek pending");
- return TRUE;
- }
+ GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ") Position previous to seek: %" G_GUINT64_FORMAT, segment->start, segment->stop, members->readPosition);
- if (priv->wereHeadersReceived && !priv->isSeekable) {
- GST_WARNING_OBJECT(src, "Not seekable");
- return FALSE;
- }
+ // Before attempting to seek, basesrc will call isSeekable(). If isSeekable() is true, a flush will be made and
+ // this function will be called. basesrc still gives us the chance here to return FALSE and cancel the seek.
+ // We cannot afford to return FALSE in this function though unless we're going to fail on purpose, since at this
+ // point we have already been flushed and cancelled the HTTP request that was feeding us data.
if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
- GST_WARNING_OBJECT(src, "Invalid seek segment");
+ GST_ERROR_OBJECT(src, "Invalid seek segment");
return FALSE;
}
- if (priv->haveSize && segment->start >= priv->size)
+ if (members->haveSize && segment->start >= members->size)
GST_WARNING_OBJECT(src, "Potentially seeking behind end of file, might EOS immediately");
- priv->isSeeking = true;
- priv->requestedPosition = segment->start;
- priv->stopPosition = segment->stop;
- priv->adapterCondition.notifyOne();
+ members->requestedPosition = members->readPosition = segment->start;
+ members->stopPosition = segment->stop;
return TRUE;
}
@@ -816,8 +786,9 @@
if (GST_QUERY_TYPE(query) == GST_QUERY_URI) {
gst_query_set_uri(query, priv->originalURI.data());
- if (!priv->redirectedURI.isNull())
- gst_query_set_uri_redirection(query, priv->redirectedURI.data());
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ if (!members->redirectedURI.isNull())
+ gst_query_set_uri_redirection(query, members->redirectedURI.data());
result = TRUE;
}
@@ -838,12 +809,31 @@
static gboolean webKitWebSrcUnLock(GstBaseSrc* baseSrc)
{
WebKitWebSrc* src = ""
- LockHolder locker(src->priv->responseLock);
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
GST_DEBUG_OBJECT(src, "Unlock");
- src->priv->isFlushing = true;
- src->priv->responseCondition.notifyOne();
- src->priv->adapterCondition.notifyOne();
+ members->isFlushing = true;
+
+ // If we have a network resource request open, we ask the main thread to close it.
+ if (members->resource) {
+ GST_DEBUG_OBJECT(src, "Resource request R%u will be stopped", members->requestNumber);
+ RunLoop::main().dispatch([protector = WTF::ensureGRef(src), resource = WTFMove(members->resource), requestNumber = members->requestNumber] {
+ GST_DEBUG_OBJECT(protector.get(), "Stopping resource request R%u", requestNumber);
+ resource->stop();
+ resource->setClient(nullptr);
+ });
+ }
+ ASSERT(!members->resource);
+
+ if (!src->priv->keepAlive)
+ members->loader = nullptr;
+
+ // Ensure all network callbacks from the old request don't feed data to WebKitWebSrc anymore.
+ members->requestNumber++;
+
+ // Wake up streaming thread.
+ members->responseCondition.notifyOne();
+
return TRUE;
}
@@ -850,39 +840,14 @@
static gboolean webKitWebSrcUnLockStop(GstBaseSrc* baseSrc)
{
WebKitWebSrc* src = ""
- LockHolder locker(src->priv->responseLock);
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
GST_DEBUG_OBJECT(src, "Unlock stop");
- src->priv->isFlushing = false;
+ members->isFlushing = false;
+ webkitWebSrcReset(src, members, ResetType::Soft);
return TRUE;
}
-static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
-{
- WebKitWebSrc* src = ""
-
-#if GST_CHECK_VERSION(1, 14, 0)
- GST_DEBUG_OBJECT(src, "%s", gst_state_change_get_name(transition));
-#endif
- switch (transition) {
- case GST_STATE_CHANGE_READY_TO_NULL:
- webKitWebSrcCloseSession(src);
- break;
- case GST_STATE_CHANGE_PAUSED_TO_READY: {
- LockHolder locker(src->priv->responseLock);
- GST_DEBUG_OBJECT(src, "PAUSED->READY cancelling network requests");
- src->priv->isFlushing = true;
- src->priv->responseCondition.notifyOne();
- src->priv->adapterCondition.notifyOne();
- break;
- }
- default:
- break;
- }
-
- return GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
-}
-
static bool urlHasSupportedProtocol(const URL& url)
{
return url.isValid() && (url.protocolIsInHTTPFamily() || url.protocolIsBlob());
@@ -938,7 +903,6 @@
return FALSE;
}
- priv->redirectedURI = CString();
priv->originalURI = CString();
if (!uri)
return TRUE;
@@ -972,16 +936,19 @@
void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
{
ASSERT(player);
- src->priv->player = player;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ members->player = player;
}
bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
{
- return src->priv->didPassAccessControlCheck;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ return members->didPassAccessControlCheck;
}
-CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
- : m_src(GST_ELEMENT(src))
+CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request, unsigned requestNumber)
+ : m_requestNumber(requestNumber)
+ , m_src(GST_ELEMENT(src))
, m_request(WTFMove(request))
{
}
@@ -990,6 +957,7 @@
void CachedResourceStreamingClient::checkUpdateBlocksize(unsigned bytesRead)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
WebKitWebSrcPrivate* priv = src->priv;
@@ -1026,32 +994,44 @@
void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response, CompletionHandler<void(PolicyChecker::ShouldContinue)>&& completionHandler)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
WebKitWebSrcPrivate* priv = src->priv;
- priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ if (members->requestNumber != m_requestNumber) {
+ completionHandler(PolicyChecker::ShouldContinue::No);
+ return;
+ }
- GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
+ GST_DEBUG_OBJECT(src, "R%u: Received response: %d", m_requestNumber, response.httpStatusCode());
+ members->didPassAccessControlCheck = members->resource->didPassAccessControlCheck();
m_origins.add(SecurityOrigin::create(response.url()));
auto responseURI = response.url().string().utf8();
if (priv->originalURI != responseURI)
- priv->redirectedURI = WTFMove(responseURI);
+ members->redirectedURI = WTFMove(responseURI);
- uint64_t length = response.expectedContentLength();
- if (length > 0 && priv->requestedPosition && response.httpStatusCode() == 206)
- length += priv->requestedPosition;
+ // length will be zero (unknown) if no Content-Length is provided or the response is compressed with Content-Encoding.
+ uint64_t length = !response.httpHeaderFields().contains(HTTPHeaderName::ContentEncoding) ? response.expectedContentLength() : 0;
+ if (length > 0 && members->requestedPosition && response.httpStatusCode() == 206)
+ length += members->requestedPosition;
- priv->httpHeaders.reset(gst_structure_new_empty("http-headers"));
- gst_structure_set(priv->httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
+ GUniquePtr<GstStructure> httpHeaders(gst_structure_new_empty("http-headers"));
+
+ gst_structure_set(httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
"http-status-code", G_TYPE_UINT, response.httpStatusCode(), nullptr);
- if (!priv->redirectedURI.isNull())
- gst_structure_set(priv->httpHeaders.get(), "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
+ if (!members->redirectedURI.isNull())
+ gst_structure_set(httpHeaders.get(), "redirection-uri", G_TYPE_STRING, members->redirectedURI.data(), nullptr);
+
+ // Pack request headers in the http-headers structure.
GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
for (const auto& header : m_request.httpHeaderFields())
gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
- GST_DEBUG_OBJECT(src, "Request headers going downstream: %" GST_PTR_FORMAT, headers.get());
- gst_structure_set(priv->httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+ GST_DEBUG_OBJECT(src, "R%u: Request headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
+ gst_structure_set(httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+
+ // Pack response headers in the http-headers structure.
headers.reset(gst_structure_new_empty("response-headers"));
for (const auto& header : response.httpHeaderFields()) {
bool ok = false;
@@ -1061,57 +1041,43 @@
else
gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
}
- auto contentLengthFieldName(httpHeaderNameString(HTTPHeaderName::ContentLength).toString());
- if (!gst_structure_has_field(headers.get(), contentLengthFieldName.utf8().data()))
- gst_structure_set(headers.get(), contentLengthFieldName.utf8().data(), G_TYPE_UINT64, static_cast<uint64_t>(length), nullptr);
- gst_structure_set(priv->httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
- GST_DEBUG_OBJECT(src, "Response headers going downstream: %" GST_PTR_FORMAT, headers.get());
+ GST_DEBUG_OBJECT(src, "R%u: Response headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
+ gst_structure_set(httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
- priv->httpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, gst_structure_copy(priv->httpHeaders.get())));
+ members->pendingHttpHeadersMessage = adoptGRef(gst_message_new_element(GST_OBJECT_CAST(src), gst_structure_copy(httpHeaders.get())));
+ members->pendingHttpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders.release()));
- auto scopeExit = makeScopeExit([&] {
- GstStructure* structure = gst_structure_copy(src->priv->httpHeaders.get());
- gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src), structure));
- });
-
if (response.httpStatusCode() >= 400) {
- GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
- priv->doesHaveEOS = true;
- webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+ GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received %d HTTP error code", m_requestNumber, response.httpStatusCode()), (nullptr));
+ members->doesHaveEOS = true;
+ members->responseCondition.notifyOne();
completionHandler(PolicyChecker::ShouldContinue::No);
return;
}
- if (priv->requestedPosition) {
+ if (members->requestedPosition) {
// Seeking ... we expect a 206 == PARTIAL_CONTENT
- if (response.httpStatusCode() == 200) {
- // Range request didn't have a ranged response; resetting offset.
- priv->readPosition = 0;
- } else if (response.httpStatusCode() != 206) {
+ if (response.httpStatusCode() != 206) {
// Range request completely failed.
- GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
- priv->doesHaveEOS = true;
- webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+ GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received unexpected %d HTTP status code for range request", m_requestNumber, response.httpStatusCode()), (nullptr));
+ members->doesHaveEOS = true;
+ members->responseCondition.notifyOne();
completionHandler(PolicyChecker::ShouldContinue::No);
return;
- } else {
- GST_DEBUG_OBJECT(src, "Range request succeeded");
- priv->isSeeking = false;
- priv->wasSeeking = true;
}
+ GST_DEBUG_OBJECT(src, "R%u: Range request succeeded", m_requestNumber);
}
- priv->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
+ members->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
- GST_DEBUG_OBJECT(src, "Size: %" G_GUINT64_FORMAT ", isSeekable: %s", length, boolForPrinting(priv->isSeekable));
+ GST_DEBUG_OBJECT(src, "R%u: Size: %" G_GUINT64_FORMAT ", isSeekable: %s", m_requestNumber, length, boolForPrinting(members->isSeekable));
if (length > 0) {
- if (!priv->haveSize || priv->size != length) {
- priv->haveSize = true;
- priv->size = length;
- priv->isDurationSet = false;
+ if (!members->haveSize || members->size != length) {
+ members->haveSize = true;
+ members->size = length;
}
} else
- priv->haveSize = false;
+ members->haveSize = false;
// Signal to downstream if this is an Icecast stream.
GRefPtr<GstCaps> caps;
@@ -1123,114 +1089,106 @@
caps = adoptGRef(gst_caps_new_simple("application/x-icy", "metadata-interval", G_TYPE_INT, metadataInterval, nullptr));
String contentType = response.httpHeaderField(HTTPHeaderName::ContentType);
- GST_DEBUG_OBJECT(src, "Response ContentType: %s", contentType.utf8().data());
+ GST_DEBUG_OBJECT(src, "R%u: Response ContentType: %s", m_requestNumber, contentType.utf8().data());
gst_caps_set_simple(caps.get(), "content-type", G_TYPE_STRING, contentType.utf8().data(), nullptr);
}
}
-
if (caps) {
- GST_DEBUG_OBJECT(src, "Set caps to %" GST_PTR_FORMAT, caps.get());
- gst_base_src_set_caps(GST_BASE_SRC_CAST(src), caps.get());
+ GST_DEBUG_OBJECT(src, "R%u: Set caps to %" GST_PTR_FORMAT, m_requestNumber, caps.get());
+ members->pendingCaps = WTFMove(caps);
}
- {
- LockHolder locker(priv->responseLock);
- priv->wereHeadersReceived = true;
- priv->headersCondition.notifyOne();
- }
+ members->wasResponseReceived = true;
+ members->responseCondition.notifyOne();
+
completionHandler(PolicyChecker::ShouldContinue::Yes);
}
void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
- GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
WebKitWebSrcPrivate* priv = src->priv;
- GST_LOG_OBJECT(src, "Have %d bytes of data", length);
- LockHolder locker(priv->responseLock);
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+ if (members->requestNumber != m_requestNumber)
+ return;
+
+ GST_LOG_OBJECT(src, "R%u: Have %d bytes of data", m_requestNumber, length);
+
// Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
// that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
// sending time from the receiving time, it is better to ignore it.
- if (!std::isnan(priv->downloadStartTime)) {
- priv->totalDownloadedBytes += length;
- double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
- GST_TRACE_OBJECT(src, "downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
- , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
+ if (!std::isnan(members->downloadStartTime)) {
+ members->totalDownloadedBytes += length;
+ double timeSinceStart = (WallTime::now() - members->downloadStartTime).seconds();
+ GST_TRACE_OBJECT(src, "R%u: downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", m_requestNumber, members->totalDownloadedBytes, timeSinceStart
+ , timeSinceStart ? members->totalDownloadedBytes / timeSinceStart : 0);
} else {
- priv->downloadStartTime = WallTime::now();
- priv->totalDownloadedBytes = 0;
+ members->downloadStartTime = WallTime::now();
}
- uint64_t newPosition = priv->readPosition + length;
- if (LIKELY (priv->requestedPosition == priv->readPosition))
- priv->requestedPosition = newPosition;
- priv->readPosition = newPosition;
+ members->readPosition += length;
+ ASSERT(!members->haveSize || members->readPosition <= members->size);
- uint64_t newSize = 0;
- if (priv->haveSize && (newPosition > priv->size)) {
- GST_DEBUG_OBJECT(src, "Got position previous estimated content size (%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", newPosition, priv->size);
- newSize = newPosition;
- }
-
- if (newSize) {
- priv->size = newSize;
- baseSrc->segment.duration = priv->size;
- gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
- }
-
gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src),
- gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, priv->readPosition, "size", G_TYPE_UINT64, priv->size, nullptr)));
+ gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, members->readPosition, "size", G_TYPE_UINT64, members->size, nullptr)));
checkUpdateBlocksize(length);
- if (!priv->wasResponseReceived)
- priv->wasResponseReceived = true;
- priv->responseCondition.notifyOne();
-
- {
- LockHolder adapterLocker(priv->adapterLock);
- GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
- gst_adapter_push(priv->adapter.get(), buffer);
- stopLoaderIfNeeded(src);
- priv->adapterCondition.notifyOne();
- }
+ GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
+ gst_adapter_push(members->adapter.get(), buffer);
+ stopLoaderIfNeeded(src, members);
+ members->responseCondition.notifyOne();
}
void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
- GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
- src->priv->doesHaveEOS = true;
- webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ if (members->requestNumber != m_requestNumber)
+ return;
+
+ GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
+ members->doesHaveEOS = true;
+ members->responseCondition.notifyOne();
}
void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ if (members->requestNumber != m_requestNumber)
+ return;
if (!error.isCancellation()) {
- GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
- GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
+ GST_ERROR_OBJECT(src, "R%u: Have failure: %s", m_requestNumber, error.localizedDescription().utf8().data());
+ GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
}
- src->priv->doesHaveEOS = true;
+ members->doesHaveEOS = true;
+ members->responseCondition.notifyOne();
}
void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
{
+ ASSERT(isMainThread());
WebKitWebSrc* src = ""
- WebKitWebSrcPrivate* priv = src->priv;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+ if (members->requestNumber != m_requestNumber)
+ return;
- if (priv->isSeeking && !priv->isFlushing)
- priv->isSeeking = false;
+ members->doesHaveEOS = true;
+ members->responseCondition.notifyOne();
}
bool webKitSrcWouldTaintOrigin(WebKitWebSrc* src, const SecurityOrigin& origin)
{
- WebKitWebSrcPrivate* priv = src->priv;
+ DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
- auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(priv->resource->client());
+ auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(members->resource->client());
for (auto& responseOrigin : cachedResourceStreamingClient->securityOrigins()) {
if (!origin.canAccess(*responseOrigin))
return true;