Title: [264081] releases/WebKitGTK/webkit-2.28
Revision
264081
Author
carlo...@webkit.org
Date
2020-07-08 03:07:10 -0700 (Wed, 08 Jul 2020)

Log Message

Merge r260755 - [GStreamer] Rework WebKitWebSrc threading
Source/WebCore:

https://bugs.webkit.org/show_bug.cgi?id=210284

Reviewed by Xabier Rodriguez-Calvar.

WebKitWebSrc as it is in master has a number of race conditions
leading to occasional starvation (due to cancelling the wrong request)
or data corruption (due to pushing data from a cancelled request).

The threading situation wasn't easy to follow, as it wasn't clear
access to what members should be protected by what mutex, in what
circumstances. Also, some parts of the design were also introducing
addicional complexity, such as the first request being sent from the
main thread whereas the rest were being sent from the streaming thread
or basesrc async start.

In response, this patch reworks all the locking in WebKitWebSrc to use
WTF::DataMutex. This ensures all accesses to its (now explicit)
protected members are locked. The two mutexes and condition variables
have been simplified into one, as there was no obvious need or benefit
for two of each in this case.

Requests have been numbered, which allows to safely and atomically
ignore results from cancelled requests, avoiding data corruption
races, and makes following them in debug logs much easier.

The conditions for making and cancelling requests have been simplified
to a simpler and safer model: There is at most only one active request
at anytime, flushes cancel the request, and the first create() call
always makes the new request (both at startup and after a flush).
Debug asserts and notes about the flow of operations during basesrc
seeks have been provided.

As this effort needed a review of the entire WebKitWebSrc, cleanups,
corrections and documentation comments have been provided where
appropriate.

This patch introduces no visible behavior changes, just stability
improvements.

* platform/graphics/gstreamer/GRefPtrGStreamer.h:
* platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
(WebKitWebSrcPrivate::~WebKitWebSrcPrivate):
(webkit_web_src_class_init):
(webkitWebSrcReset):
(webKitWebSrcConstructed):
(webKitWebSrcSetProperty):
(webKitWebSrcGetProperty):
(webKitWebSrcSetContext):
(webKitWebSrcSendEvent):
(restartLoaderIfNeeded):
(stopLoaderIfNeeded):
(webKitWebSrcCreate):
(webKitWebSrcStart):
(webKitWebSrcMakeRequest):
(webKitWebSrcStop):
(webKitWebSrcGetSize):
(webKitWebSrcIsSeekable):
(webKitWebSrcDoSeek):
(webKitWebSrcQuery):
(webKitWebSrcUnLock):
(webKitWebSrcUnLockStop):
(webKitWebSrcSetUri):
(webKitWebSrcSetMediaPlayer):
(webKitSrcPassedCORSAccessCheck):
(CachedResourceStreamingClient::CachedResourceStreamingClient):
(CachedResourceStreamingClient::checkUpdateBlocksize):
(CachedResourceStreamingClient::responseReceived):
(CachedResourceStreamingClient::dataReceived):
(CachedResourceStreamingClient::accessControlCheckFailed):
(CachedResourceStreamingClient::loadFailed):
(CachedResourceStreamingClient::loadFinished):
(webKitSrcWouldTaintOrigin):
* platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:

LayoutTests:

https://bugs.webkit.org/show_bug.cgi?id=209811

Reviewed by Xabier Rodriguez-Calvar.

A test improved its status in TestExpectations from the changes made
in this patch.

* platform/gtk/TestExpectations:

Modified Paths

Diff

Modified: releases/WebKitGTK/webkit-2.28/LayoutTests/ChangeLog (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/LayoutTests/ChangeLog	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/LayoutTests/ChangeLog	2020-07-08 10:07:10 UTC (rev 264081)
@@ -1,3 +1,15 @@
+2020-04-27  Alicia Boya García  <ab...@igalia.com>
+
+        [GStreamer] Rework WebKitWebSrc threading
+        https://bugs.webkit.org/show_bug.cgi?id=209811
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        A test improved its status in TestExpectations from the changes made
+        in this patch.
+
+        * platform/gtk/TestExpectations:
+
 2020-03-20  David Kilzer  <ddkil...@apple.com>
 
         Content-Type & Nosniff Ignored on XML External Entity Resources

Modified: releases/WebKitGTK/webkit-2.28/LayoutTests/platform/gtk/TestExpectations (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/LayoutTests/platform/gtk/TestExpectations	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/LayoutTests/platform/gtk/TestExpectations	2020-07-08 10:07:10 UTC (rev 264081)
@@ -1294,8 +1294,6 @@
 webkit.org/b/206656 imported/w3c/web-platform-tests/mediacapture-streams/MediaStreamTrack-MediaElement-disabled-video-is-black.https.html [ Failure ]
 webkit.org/b/206656 imported/w3c/web-platform-tests/mediacapture-streams/MediaStreamTrack-getSettings.https.html [ Failure ]
 
-webkit.org/b/206657 imported/w3c/web-platform-tests/html/semantics/embedded-content/the-video-element/resize-during-playback.html [ Failure ]
-
 # LFC (LayoutFormatingContext is disabled by default).
 fast/layoutformattingcontext/ [ Skip ]
 

Modified: releases/WebKitGTK/webkit-2.28/Source/WebCore/ChangeLog (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/Source/WebCore/ChangeLog	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/Source/WebCore/ChangeLog	2020-07-08 10:07:10 UTC (rev 264081)
@@ -1,3 +1,80 @@
+2020-04-27  Alicia Boya García  <ab...@igalia.com>
+
+        [GStreamer] Rework WebKitWebSrc threading
+        https://bugs.webkit.org/show_bug.cgi?id=210284
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        WebKitWebSrc as it is in master has a number of race conditions
+        leading to occasional starvation (due to cancelling the wrong request)
+        or data corruption (due to pushing data from a cancelled request).
+
+        The threading situation wasn't easy to follow, as it wasn't clear
+        access to what members should be protected by what mutex, in what
+        circumstances. Also, some parts of the design were also introducing
+        addicional complexity, such as the first request being sent from the
+        main thread whereas the rest were being sent from the streaming thread
+        or basesrc async start.
+
+        In response, this patch reworks all the locking in WebKitWebSrc to use
+        WTF::DataMutex. This ensures all accesses to its (now explicit)
+        protected members are locked. The two mutexes and condition variables
+        have been simplified into one, as there was no obvious need or benefit
+        for two of each in this case.
+
+        Requests have been numbered, which allows to safely and atomically
+        ignore results from cancelled requests, avoiding data corruption
+        races, and makes following them in debug logs much easier.
+
+        The conditions for making and cancelling requests have been simplified
+        to a simpler and safer model: There is at most only one active request
+        at anytime, flushes cancel the request, and the first create() call
+        always makes the new request (both at startup and after a flush).
+        Debug asserts and notes about the flow of operations during basesrc
+        seeks have been provided.
+
+        As this effort needed a review of the entire WebKitWebSrc, cleanups,
+        corrections and documentation comments have been provided where
+        appropriate.
+
+        This patch introduces no visible behavior changes, just stability
+        improvements.
+
+        * platform/graphics/gstreamer/GRefPtrGStreamer.h:
+        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
+        (WebKitWebSrcPrivate::~WebKitWebSrcPrivate):
+        (webkit_web_src_class_init):
+        (webkitWebSrcReset):
+        (webKitWebSrcConstructed):
+        (webKitWebSrcSetProperty):
+        (webKitWebSrcGetProperty):
+        (webKitWebSrcSetContext):
+        (webKitWebSrcSendEvent):
+        (restartLoaderIfNeeded):
+        (stopLoaderIfNeeded):
+        (webKitWebSrcCreate):
+        (webKitWebSrcStart):
+        (webKitWebSrcMakeRequest):
+        (webKitWebSrcStop):
+        (webKitWebSrcGetSize):
+        (webKitWebSrcIsSeekable):
+        (webKitWebSrcDoSeek):
+        (webKitWebSrcQuery):
+        (webKitWebSrcUnLock):
+        (webKitWebSrcUnLockStop):
+        (webKitWebSrcSetUri):
+        (webKitWebSrcSetMediaPlayer):
+        (webKitSrcPassedCORSAccessCheck):
+        (CachedResourceStreamingClient::CachedResourceStreamingClient):
+        (CachedResourceStreamingClient::checkUpdateBlocksize):
+        (CachedResourceStreamingClient::responseReceived):
+        (CachedResourceStreamingClient::dataReceived):
+        (CachedResourceStreamingClient::accessControlCheckFailed):
+        (CachedResourceStreamingClient::loadFailed):
+        (CachedResourceStreamingClient::loadFinished):
+        (webKitSrcWouldTaintOrigin):
+        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:
+
 2020-04-27  Alberto Garcia  <be...@igalia.com>
 
         [GTK] [2.28.0] The Yelp build crashes if DISPLAY is not set

Modified: releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/GRefPtrGStreamer.h (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/GRefPtrGStreamer.h	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/GRefPtrGStreamer.h	2020-07-08 10:07:10 UTC (rev 264081)
@@ -25,7 +25,7 @@
 #include <wtf/glib/GRefPtr.h>
 
 typedef struct _WebKitVideoSink WebKitVideoSink;
-typedef struct _WebKitWebSrc WebKitWebSrc;
+struct WebKitWebSrc;
 
 #if USE(GSTREAMER_GL)
 typedef struct _GstGLDisplay GstGLDisplay;

Modified: releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp	2020-07-08 10:07:10 UTC (rev 264081)
@@ -1,7 +1,7 @@
 /*
  *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.dro...@collabora.co.uk>
  *  Copyright (C) 2013 Collabora Ltd.
- *  Copyright (C) 2019 Igalia S.L.
+ *  Copyright (C) 2019, 2020 Igalia S.L.
  *
  *  This library is free software; you can redistribute it and/or
  *  modify it under the terms of the GNU Lesser General Public
@@ -25,7 +25,6 @@
 
 #include "GStreamerCommon.h"
 #include "HTTPHeaderNames.h"
-#include "MainThreadNotifier.h"
 #include "MediaPlayer.h"
 #include "PlatformMediaResourceLoader.h"
 #include "PolicyChecker.h"
@@ -34,11 +33,14 @@
 #include "ResourceResponse.h"
 #include <cstdint>
 #include <wtf/Condition.h>
+#include <wtf/DataMutex.h>
+#include <wtf/RunLoop.h>
 #include <wtf/Scope.h>
 #include <wtf/glib/WTFGType.h>
 #include <wtf/text/CString.h>
 
 using namespace WebCore;
+using WTF::DataMutex;
 
 // Never pause download of media resources smaller than 2MiB.
 #define SMALL_MEDIA_RESOURCE_MAX_SIZE 2 * 1024 * 1024
@@ -55,7 +57,7 @@
     WTF_MAKE_FAST_ALLOCATED;
     WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
 public:
-    CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&);
+    CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&, unsigned requestNumber);
     virtual ~CachedResourceStreamingClient();
 
     const HashSet<RefPtr<WebCore::SecurityOrigin>>& securityOrigins() const { return m_origins; }
@@ -80,6 +82,7 @@
     static constexpr float s_reduceBlocksizeFactor { 0.5 };
     int m_reduceBlocksizeCount { 0 };
     int m_increaseBlocksizeCount { 0 };
+    unsigned m_requestNumber;
 
     GRefPtr<GstElement> m_src;
     ResourceRequest m_request;
@@ -86,67 +89,72 @@
     HashSet<RefPtr<WebCore::SecurityOrigin>> m_origins;
 };
 
-enum MainThreadSourceNotification {
-    Start = 1 << 0,
-    Stop = 1 << 1,
-    Dispose = 1 << 2,
-};
+struct WebKitWebSrcPrivate {
+    // Constants initialized during construction:
+    unsigned minimumBlocksize;
 
-struct _WebKitWebSrcPrivate {
-    ~_WebKitWebSrcPrivate()
-    {
-        if (notifier && notifier->isValid()) {
-            notifier->notifyAndWait(MainThreadSourceNotification::Dispose, [&] {
-                if (resource) {
-                    auto* client = static_cast<CachedResourceStreamingClient*>(resource->client());
-                    if (client)
-                        client->setSourceElement(nullptr);
-
-                    resource->setClient(nullptr);
-                }
-                loader = nullptr;
-            });
-            notifier->invalidate();
-            notifier = nullptr;
-        }
-    }
-
+    // Configuration of the element (properties set by the user of WebKitWebSrc):
+    // They can only change when state < PAUSED.
     CString originalURI;
-    CString redirectedURI;
     bool keepAlive;
     GUniquePtr<GstStructure> extraHeaders;
     bool compress;
     GUniquePtr<gchar> httpMethod;
-    WebCore::MediaPlayer* player;
-    RefPtr<PlatformMediaResourceLoader> loader;
-    RefPtr<PlatformMediaResource> resource;
-    RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
-    bool didPassAccessControlCheck;
-    bool wereHeadersReceived;
-    Condition headersCondition;
-    Lock responseLock;
-    bool wasResponseReceived;
-    Condition responseCondition;
-    bool doesHaveEOS;
-    bool isFlushing { false };
-    uint64_t readPosition;
-    uint64_t requestedPosition;
-    uint64_t stopPosition;
-    bool isDurationSet;
-    bool haveSize;
-    uint64_t size;
-    bool isSeekable;
-    bool isSeeking;
-    bool wasSeeking { false };
-    unsigned minimumBlocksize;
-    Lock adapterLock;
-    Condition adapterCondition;
-    bool isDownloadSuspended { false };
-    GRefPtr<GstAdapter> adapter;
-    GRefPtr<GstEvent> httpHeadersEvent;
-    GUniquePtr<GstStructure> httpHeaders;
-    WallTime downloadStartTime { WallTime::nan() };
-    uint64_t totalDownloadedBytes { 0 };
+
+    struct StreamingMembers {
+        ~StreamingMembers()
+        {
+            // By the time we're destroying WebKitWebSrcPrivate unLock() should have been called and therefore resource
+            // should have already been cleared.
+            ASSERT(!resource);
+            // ResourceLoader is not thread-safe. It's not even ThreadSafeRefCounted. Therefore, to be safe, we want the
+            // unref to happen in the main thread.
+            if (loader)
+                RunLoop::main().dispatch([loader = WTFMove(loader)] { });
+        }
+
+        // Properties initially empty, but set once the first HTTP response arrives:
+        bool wasResponseReceived;
+        CString redirectedURI;
+        bool didPassAccessControlCheck;
+        bool haveSize;
+        uint64_t size;
+        bool isSeekable;
+        GRefPtr<GstCaps> pendingCaps;
+        GRefPtr<GstMessage> pendingHttpHeadersMessage; // Set from MT, sent from create().
+        GRefPtr<GstEvent> pendingHttpHeadersEvent; // Set from MT, sent from create().
+
+        // Properties updated with every downloaded data block:
+        WallTime downloadStartTime { WallTime::nan() };
+        uint64_t totalDownloadedBytes { 0 };
+        bool doesHaveEOS; // Set both when we reach stopPosition and on errors (including on responseReceived).
+        bool isDownloadSuspended { false }; // Set to true from the network handler when the high water level is reached.
+
+        // Obtained by means of GstContext queries before making the first HTTP request.
+        // We use it for getting access to WebKit networking objects: the PlatformMediaResourceLoader factory [createResourceLoader()]
+        // and the player HTTP referrer string.
+        WebCore::MediaPlayer* player;
+
+        // Properties used for GStreamer data-flow in create().
+        bool isFlushing { false };
+        Condition responseCondition; // Must be signaled after any updates on HTTP requests, and when flushing.
+        GRefPtr<GstAdapter> adapter;
+        bool isDurationSet;
+        uint64_t readPosition;
+
+        // Properties only set during seek.
+        // basesrc ensures they can't change during a create() call by taking the STREAMING_LOCK.
+        // (An initial seek is also guaranteed by basesrc.)
+        unsigned requestNumber { 1 };
+        uint64_t requestedPosition { 0 };
+        uint64_t stopPosition { UINT64_MAX };
+
+        bool isRequestPending { true };
+
+        RefPtr<PlatformMediaResourceLoader> loader;
+        RefPtr<PlatformMediaResource> resource;
+    };
+    DataMutex<StreamingMembers> dataMutex;
 };
 
 enum {
@@ -169,9 +177,8 @@
 static void webKitWebSrcConstructed(GObject*);
 static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
 static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
-static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
 static GstFlowReturn webKitWebSrcCreate(GstPushSrc*, GstBuffer**);
-static gboolean webKitWebSrcMakeRequest(GstBaseSrc*, bool);
+static void webKitWebSrcMakeRequest(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
 static gboolean webKitWebSrcStart(GstBaseSrc*);
 static gboolean webKitWebSrcStop(GstBaseSrc*);
 static gboolean webKitWebSrcGetSize(GstBaseSrc*, guint64* size);
@@ -181,8 +188,8 @@
 static gboolean webKitWebSrcUnLock(GstBaseSrc*);
 static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
 static void webKitWebSrcSetContext(GstElement*, GstContext*);
-static void restartLoaderIfNeeded(WebKitWebSrc*);
-static void stopLoaderIfNeeded(WebKitWebSrc*);
+static void restartLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
+static void stopLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
 
 #define webkit_web_src_parent_class parent_class
 WEBKIT_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_PUSH_SRC,
@@ -230,7 +237,6 @@
         g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
 
-    eklass->change_state = GST_DEBUG_FUNCPTR(webKitWebSrcChangeState);
     eklass->set_context = GST_DEBUG_FUNCPTR(webKitWebSrcSetContext);
 
     GstBaseSrcClass* baseSrcClass = GST_BASE_SRC_CLASS(klass);
@@ -247,18 +253,40 @@
     pushSrcClass->create = GST_DEBUG_FUNCPTR(webKitWebSrcCreate);
 }
 
-static void webkitWebSrcReset(WebKitWebSrc* src)
+enum class ResetType {
+    Soft,
+    Hard
+};
+
+static void webkitWebSrcReset(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members, ResetType resetType)
 {
-    WebKitWebSrcPrivate* priv = src->priv;
+    GST_DEBUG_OBJECT(src, "Resetting internal state");
+    gst_adapter_clear(members->adapter.get());
+    members->isRequestPending = true;
 
-    GST_DEBUG_OBJECT(src, "Resetting internal state");
-    priv->haveSize = false;
-    priv->wereHeadersReceived = false;
-    priv->isSeekable = false;
-    priv->readPosition = 0;
-    priv->requestedPosition = 0;
-    priv->stopPosition = -1;
-    priv->size = 0;
+    // Reset request state. Any previous request has been cancelled at this point.
+    members->wasResponseReceived = false;
+    members->doesHaveEOS = false;
+    members->downloadStartTime = WallTime::nan();
+    members->totalDownloadedBytes = 0; // Resetted for each request, used to estimate download speed.
+    members->pendingHttpHeadersMessage = nullptr;
+    members->pendingHttpHeadersEvent = nullptr;
+
+    // After a flush, we have to emit a segment again.
+    members->isDurationSet = false;
+
+    // Hard reset is done during initialization and state transitions.
+    // Soft reset is done during flushes. In these, we preserve the seek target.
+    if (resetType == ResetType::Hard) {
+        members->didPassAccessControlCheck = false;
+        members->redirectedURI = CString();
+        members->isSeekable = false;
+        members->haveSize = false;
+        members->size = 0;
+        members->requestedPosition = 0;
+        members->stopPosition = UINT64_MAX;
+        members->readPosition = members->requestedPosition;
+    }
 }
 
 static void webKitWebSrcConstructed(GObject* object)
@@ -268,13 +296,13 @@
     WebKitWebSrc* src = ""
     WebKitWebSrcPrivate* priv = src->priv;
 
-    priv->notifier = MainThreadNotifier<MainThreadSourceNotification>::create();
-    priv->adapter = adoptGRef(gst_adapter_new());
     priv->minimumBlocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(src));
 
-    webkitWebSrcReset(src);
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+    members->adapter = adoptGRef(gst_adapter_new());
+    webkitWebSrcReset(src, members, ResetType::Hard);
+
     gst_base_src_set_automatic_eos(GST_BASE_SRC_CAST(src), FALSE);
-    gst_base_src_set_async(GST_BASE_SRC_CAST(src), TRUE);
 }
 
 static void webKitWebSrcSetProperty(GObject* object, guint propID, const GValue* value, GParamSpec* pspec)
@@ -314,9 +342,11 @@
     case PROP_LOCATION:
         g_value_set_string(value, priv->originalURI.data());
         break;
-    case PROP_RESOLVED_LOCATION:
-        g_value_set_string(value, priv->redirectedURI.isNull() ? priv->originalURI.data() : priv->redirectedURI.data());
+    case PROP_RESOLVED_LOCATION: {
+        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+        g_value_set_string(value, members->redirectedURI.isNull() ? priv->originalURI.data() : members->redirectedURI.data());
         break;
+    }
     case PROP_KEEP_ALIVE:
         g_value_set_boolean(value, priv->keepAlive);
         break;
@@ -343,202 +373,198 @@
     GST_DEBUG_OBJECT(src, "context type: %s", gst_context_get_context_type(context));
     if (gst_context_has_context_type(context, WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME)) {
         const GValue* value = gst_structure_get_value(gst_context_get_structure(context), "player");
-        priv->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
+        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+        members->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
     }
     GST_ELEMENT_CLASS(parent_class)->set_context(element, context);
 }
 
-static void restartLoaderIfNeeded(WebKitWebSrc* src)
+static void restartLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
 {
-    WebKitWebSrcPrivate* priv = src->priv;
-
-    if (!priv->isDownloadSuspended) {
+    if (!members->isDownloadSuspended) {
         GST_TRACE_OBJECT(src, "download already active");
         return;
     }
 
     GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
-        " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
-        , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
-    if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+        " (min %u)", boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->doesHaveEOS), boolForPrinting(members->haveSize)
+        , boolForPrinting(members->isSeekable), members->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
+    if (members->doesHaveEOS || !members->haveSize || !members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
         GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
         return;
     }
-    GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
-    if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
+    GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", members->readPosition, gst_element_state_get_name(GST_STATE(src)));
+    if (!members->readPosition || members->readPosition == members->size || GST_STATE(src) < GST_STATE_PAUSED) {
         GST_TRACE_OBJECT(src, "can't restart download");
         return;
     }
 
-    size_t queueSize = gst_adapter_available(priv->adapter.get());
-    GST_TRACE_OBJECT(src, "queue size %zd (min %1.0f)", queueSize
-        , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
+    size_t queueSize = gst_adapter_available(members->adapter.get());
+    GST_TRACE_OBJECT(src, "queue size %zu (min %1.0f)", queueSize
+        , members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
 
-    if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
+    if (queueSize >= members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
         GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
         return;
     }
 
     GST_DEBUG_OBJECT(src, "restarting download");
-    priv->isDownloadSuspended = false;
-    webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
+    members->isDownloadSuspended = false;
+    members->requestNumber++;
+    members->requestedPosition = members->readPosition;
+    webKitWebSrcMakeRequest(src, members);
 }
 
 
-static void stopLoaderIfNeeded(WebKitWebSrc* src)
+static void stopLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
 {
-    WebKitWebSrcPrivate* priv = src->priv;
+    ASSERT(isMainThread());
 
-    if (priv->isDownloadSuspended) {
+    if (members->isDownloadSuspended) {
         GST_TRACE_OBJECT(src, "download already suspended");
         return;
     }
 
     GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
-        , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
+        , boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->haveSize), boolForPrinting(members->isSeekable), members->size
         , SMALL_MEDIA_RESOURCE_MAX_SIZE);
-    if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+    if (!members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
         GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
         return;
     }
 
-    size_t queueSize = gst_adapter_available(priv->adapter.get());
-    GST_TRACE_OBJECT(src, "queue size %zd (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
-    if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
+    size_t queueSize = gst_adapter_available(members->adapter.get());
+    GST_TRACE_OBJECT(src, "queue size %zu (max %1.0f)", queueSize, members->size * HIGH_QUEUE_FACTOR_THRESHOLD);
+    if (queueSize <= members->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
         GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
         return;
     }
 
-    GST_DEBUG_OBJECT(src, "stopping download");
-    priv->isDownloadSuspended = true;
-    priv->resource->stop();
+    if (members->readPosition == members->size) {
+        GST_TRACE_OBJECT(src, "just downloaded the last chunk in the file, loadFinished() is about to be called");
+        return;
+    }
+
+    GST_DEBUG_OBJECT(src, "R%u: stopping download", members->requestNumber);
+    members->isDownloadSuspended = true;
+    members->resource->stop();
 }
 
 static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
 {
+    ASSERT(!isMainThread());
     GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
     WebKitWebSrc* src = ""
     WebKitWebSrcPrivate* priv = src->priv;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
 
-    GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, priv->readPosition, priv->requestedPosition);
+    // We need members->player to make requests. There are two mechanisms for this.
+    //
+    // 1) webKitWebSrcSetMediaPlayer() is called by MediaPlayerPrivateGStreamer by means of hooking playbin's
+    //    "source-setup" event. This doesn't work for additional WebKitWebSrc elements created by adaptivedemux.
+    //
+    // 2) A GstContext query made here. Because of a bug, this only works in GStreamer >= 1.12.
+    //
+    // As a compatibility workaround, the http: URI protocol is only registered for gst>=1.12; otherwise using
+    // webkit+http:, which is used by MediaPlayerPrivateGStreamer but not by adaptivedemux's additional source
+    // elements, therefore using souphttpsrc instead and not routing traffic through the NetworkProcess.
+    if (webkitGstCheckVersion(1, 12, 0) && !members->player) {
+        members.runUnlocked([src, baseSrc]() {
+            GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
+            if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
+                GstContext* context;
 
-    if (priv->requestedPosition != priv->readPosition) {
-        {
-            LockHolder adapterLocker(priv->adapterLock);
-            GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
-            gst_adapter_clear(priv->adapter.get());
-        }
-        uint64_t requestedPosition = priv->requestedPosition;
-        webKitWebSrcStop(baseSrc);
-        priv->requestedPosition = requestedPosition;
-        // Do not notify async-completion, in seeking flows, we will
-        // be called from GstBaseSrc's perform_seek vfunc, which holds
-        // a streaming lock in our frame. Hence, we would deadlock
-        // trying to notify async completion, since that also requires
-        // the streaming lock.
-        webKitWebSrcMakeRequest(baseSrc, false);
-    }
-
-    {
-        LockHolder locker(priv->responseLock);
-        priv->responseCondition.wait(priv->responseLock, [priv] () {
-            return priv->wasResponseReceived || priv->isFlushing;
+                gst_query_parse_context(query.get(), &context);
+                gst_element_set_context(GST_ELEMENT_CAST(src), context);
+            } else
+                gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
         });
+        if (members->isFlushing)
+            return GST_FLOW_FLUSHING;
     }
+    RELEASE_ASSERT(members->player);
 
-    // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
-    // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
-    // other buffers can be retrieved with the "non _fast" API.
-    GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
-        , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
+    GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, members->readPosition, members->requestedPosition);
 
-    if (priv->doesHaveEOS) {
-        GST_DEBUG_OBJECT(src, "EOS");
-        return GST_FLOW_EOS;
+    if (members->isRequestPending) {
+        members->isRequestPending = false;
+        webKitWebSrcMakeRequest(src, members);
     }
 
-    unsigned size = gst_base_src_get_blocksize(baseSrc);
-    size_t queueSize;
-    {
-        LockHolder adapterLocker(priv->adapterLock);
-        unsigned retries = 0;
-        queueSize = gst_adapter_available(priv->adapter.get());
-        GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
-        while (!queueSize && !priv->isFlushing) {
-            GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
-            priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
-                restartLoaderIfNeeded(src);
-                return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
-            });
-            queueSize = gst_adapter_available(priv->adapter.get());
-            GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
-            if (queueSize || priv->isFlushing) {
-                // We have data or we're flushing. We can break the loop here.
-                break;
-            }
+    // Wait for the response headers.
+    members->responseCondition.wait(members.mutex(), [&] () {
+        return members->wasResponseReceived || members->isFlushing;
+    });
 
-            // We should keep waiting but we could be in EOS. Let's check the two possibilities:
-            // 1. We are at the end of the file with a known size.
-            // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
-            if (priv->haveSize && priv->readPosition >= priv->size) {
-                GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
-                return GST_FLOW_EOS;
-            }
-            GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
-            if (!priv->isDownloadSuspended && ++retries >= 10) {
-                GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
-                return GST_FLOW_EOS;
-            }
-        }
-    }
+    if (members->isFlushing)
+        return GST_FLOW_FLUSHING;
 
-    if (priv->isFlushing) {
-        GST_DEBUG_OBJECT(src, "Flushing");
-        return GST_FLOW_FLUSHING;
+    if (members->pendingCaps) {
+        GST_DEBUG_OBJECT(src, "Setting caps: %" GST_PTR_FORMAT, members->pendingCaps.get());
+        members.runUnlocked([baseSrc, caps = members->pendingCaps.leakRef()]() {
+            gst_base_src_set_caps(baseSrc, caps);
+        });
+        if (members->isFlushing)
+            return GST_FLOW_FLUSHING;
     }
 
-    if (priv->haveSize && !priv->isDurationSet) {
-        GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, priv->size);
-        baseSrc->segment.duration = priv->size;
-        priv->isDurationSet = true;
+    if (members->haveSize && !members->isDurationSet) {
+        GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, members->size);
+        baseSrc->segment.duration = members->size;
+        members->isDurationSet = true;
         gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
     }
 
-    if (priv->httpHeadersEvent)
-        gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
+    if (members->pendingHttpHeadersMessage)
+        gst_element_post_message(GST_ELEMENT(src), members->pendingHttpHeadersMessage.leakRef());
+    if (members->pendingHttpHeadersEvent)
+        gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), members->pendingHttpHeadersEvent.leakRef());
 
-    {
-        LockHolder adapterLocker(priv->adapterLock);
-        queueSize = gst_adapter_available(priv->adapter.get());
+    restartLoaderIfNeeded(src, members);
+
+    // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
+    // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
+    // other buffers can be retrieved with the "non _fast" API.
+    GST_TRACE_OBJECT(src, "doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(members->doesHaveEOS), boolForPrinting(members->isDownloadSuspended));
+
+    unsigned size = gst_base_src_get_blocksize(baseSrc);
+    size_t queueSize = gst_adapter_available(members->adapter.get());
+    GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
+    if (!queueSize) {
+        GST_TRACE_OBJECT(src, "let's wait for data or EOS");
+        members->responseCondition.wait(members.mutex(), [&] {
+            return members->isFlushing || gst_adapter_available(members->adapter.get()) || members->doesHaveEOS;
+        });
+        if (members->isFlushing)
+            return GST_FLOW_FLUSHING;
+
+        queueSize = gst_adapter_available(members->adapter.get());
+        GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
+    }
+
+    if (queueSize) {
         if (queueSize < size) {
             GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
             size = queueSize;
         } else
             GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
-        if (size) {
-            *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
-            RELEASE_ASSERT(*buffer);
 
-            GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
-            GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
-            GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
-            GST_TRACE_OBJECT(src, "doesHaveEOS: %s, wasSeeking: %s, seeking: %s, buffer size: %u, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->wasSeeking), boolForPrinting(priv->isSeeking), size, priv->size);
-            if (priv->haveSize && GST_BUFFER_OFFSET_END(*buffer) >= priv->size) {
-                if (priv->wasSeeking)
-                    priv->wasSeeking = false;
-                else if (priv->isSeekable)
-                    priv->doesHaveEOS = true;
-            } else if (priv->wasSeeking)
-                priv->wasSeeking = false;
+        *buffer = gst_adapter_take_buffer(members->adapter.get(), size);
+        RELEASE_ASSERT(*buffer);
 
-            restartLoaderIfNeeded(src);
-        } else {
-            GST_ERROR_OBJECT(src, "Empty adapter!");
-            ASSERT_NOT_REACHED();
-        }
+        GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
+        GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
+        GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
+        GST_TRACE_OBJECT(src, "buffer size: %u, total content size: %" G_GUINT64_FORMAT, size, members->size);
+
+        restartLoaderIfNeeded(src, members);
+        return GST_FLOW_OK;
     }
 
-    return GST_FLOW_OK;
+    // If the queue is empty reached this point, the only other option is that we are in EOS.
+    ASSERT(members->doesHaveEOS);
+    GST_DEBUG_OBJECT(src, "Reached the end of the response, signalling EOS");
+    return GST_FLOW_EOS;
 }
 
 static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
@@ -594,52 +620,23 @@
 
 static gboolean webKitWebSrcStart(GstBaseSrc* baseSrc)
 {
-    // This method should only be called by BaseSrc, do not call it
-    // from ourselves unless you ensure the streaming lock is not
-    // held. If it is, you will deadlock the WebProcess.
-    return webKitWebSrcMakeRequest(baseSrc, true);
-}
-
-static gboolean webKitWebSrcMakeRequest(GstBaseSrc* baseSrc, bool notifyAsyncCompletion)
-{
     WebKitWebSrc* src = ""
-    WebKitWebSrcPrivate* priv = src->priv;
 
-    if (webkitGstCheckVersion(1, 12, 0) && !priv->player) {
-        GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
-        if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
-            GstContext* context;
-
-            gst_query_parse_context(query.get(), &context);
-            gst_element_set_context(GST_ELEMENT_CAST(src), context);
-        } else
-            gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
-    }
-
-    RELEASE_ASSERT(priv->player);
-
-    priv->wereHeadersReceived = false;
-    priv->wasResponseReceived = false;
-    priv->isDurationSet = false;
-    priv->doesHaveEOS = false;
-    priv->isFlushing = false;
-    priv->downloadStartTime = WallTime::nan();
-
-    priv->didPassAccessControlCheck = false;
-
-    if (priv->originalURI.isNull()) {
+    if (src->priv->originalURI.isNull()) {
         GST_ERROR_OBJECT(src, "No URI provided");
-        webKitWebSrcStop(baseSrc);
         return FALSE;
     }
+    return TRUE;
+}
 
-    if (priv->requestedPosition == priv->stopPosition) {
-        GST_DEBUG_OBJECT(src, "Empty segment, signaling EOS");
-        priv->doesHaveEOS = true;
-        return FALSE;
-    }
+static void webKitWebSrcMakeRequest(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
+{
+    WebKitWebSrcPrivate* priv = src->priv;
 
-    GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
+    ASSERT(!priv->originalURI.isNull());
+    ASSERT(members->requestedPosition != members->stopPosition);
+
+    GST_DEBUG_OBJECT(src, "Posting task to request R%u %s requestedPosition=%" G_GUINT64_FORMAT " stopPosition=%" G_GUINT64_FORMAT, members->requestNumber, priv->originalURI.data(), members->requestedPosition, members->stopPosition);
     URL url = "" priv->originalURI.data());
 
     ResourceRequest request(url);
@@ -646,7 +643,7 @@
     request.setAllowCookies(true);
     request.setFirstPartyForCookies(url);
 
-    request.setHTTPReferrer(priv->player->referrer());
+    request.setHTTPReferrer(members->player->referrer());
 
     if (priv->httpMethod.get())
         request.setHTTPMethod(priv->httpMethod.get());
@@ -670,12 +667,12 @@
         || !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
         request.setHTTPUserAgent("Quicktime/7.6.6");
 
-    if (priv->requestedPosition) {
-        GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedPosition));
+    if (members->requestedPosition) {
+        GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", members->requestedPosition));
         GST_DEBUG_OBJECT(src, "Range request: %s", formatedRange.get());
         request.setHTTPHeaderField(HTTPHeaderName::Range, formatedRange.get());
     }
-    priv->readPosition = priv->requestedPosition;
+    ASSERT(members->readPosition == members->requestedPosition);
 
     GST_DEBUG_OBJECT(src, "Persistent connection support %s", priv->keepAlive ? "enabled" : "disabled");
     if (!priv->keepAlive)
@@ -687,70 +684,41 @@
     // We always request Icecast/Shoutcast metadata, just in case ...
     request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
 
-    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-    priv->notifier->notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request), src, notifyAsyncCompletion] {
+    ASSERT(!isMainThread());
+    RunLoop::main().dispatch([protector = WTF::ensureGRef(src), request = WTFMove(request), requestNumber = members->requestNumber, src] {
         WebKitWebSrcPrivate* priv = protector->priv;
-        if (!priv->loader)
-            priv->loader = priv->player->createResourceLoader();
+        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+        // Ignore this task (not making any HTTP request) if by now WebKitWebSrc streaming thread is already waiting
+        // for a different request. There is no point anymore in sending this one.
+        if (members->requestNumber != requestNumber) {
+            GST_DEBUG_OBJECT(protector.get(), "Skipping R%u, current request number is %u", requestNumber, members->requestNumber);
+            return;
+        }
 
+        if (!members->loader)
+            members->loader = members->player->createResourceLoader();
+
         PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
         if (request.url().protocolIsBlob())
             loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
-        priv->resource = priv->loader->requestResource(ResourceRequest(request), loadOptions);
-        if (priv->resource) {
-            priv->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request)));
-            GST_DEBUG_OBJECT(protector.get(), "Started request");
-            if (notifyAsyncCompletion)
-                gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_OK);
+        members->resource = members->loader->requestResource(ResourceRequest(request), loadOptions);
+        if (members->resource) {
+            members->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request), requestNumber));
+            GST_DEBUG_OBJECT(protector.get(), "Started request R%u", requestNumber);
         } else {
-            GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
-            if (notifyAsyncCompletion)
-                gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_ERROR);
-            priv->loader = nullptr;
+            GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client to handle R%u", requestNumber);
+            members->loader = nullptr;
         }
     });
-
-    return TRUE;
 }
 
-static void webKitWebSrcCloseSession(WebKitWebSrc* src)
-{
-    WebKitWebSrcPrivate* priv = src->priv;
-    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-
-    priv->notifier->notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
-        WebKitWebSrcPrivate* priv = protector->priv;
-
-        GST_DEBUG_OBJECT(protector.get(), "Stopping resource loader");
-
-        if (priv->resource) {
-            priv->resource->stop();
-            priv->resource->setClient(nullptr);
-            priv->resource = nullptr;
-        }
-
-        if (!keepAlive)
-            priv->loader = nullptr;
-    });
-
-    GST_DEBUG_OBJECT(src, "Resource loader stopped");
-}
-
 static gboolean webKitWebSrcStop(GstBaseSrc* baseSrc)
 {
     WebKitWebSrc* src = ""
-    WebKitWebSrcPrivate* priv = src->priv;
-
-    if (priv->resource || (priv->loader && !priv->keepAlive))
-        webKitWebSrcCloseSession(src);
-
-    {
-        LockHolder adapterLocker(priv->adapterLock);
-        gst_adapter_clear(priv->adapter.get());
-    }
-
-    webkitWebSrcReset(src);
-    GST_DEBUG_OBJECT(src, "Stopped request");
+    // basesrc will always call unLock() and unLockStop() before calling this. See gst_base_src_stop().
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    webkitWebSrcReset(src, members, ResetType::Hard);
+    GST_DEBUG_OBJECT(src, "Stopped WebKitWebSrc");
     return TRUE;
 }
 
@@ -757,11 +725,11 @@
 static gboolean webKitWebSrcGetSize(GstBaseSrc* baseSrc, guint64* size)
 {
     WebKitWebSrc* src = ""
-    WebKitWebSrcPrivate* priv = src->priv;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
 
-    GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->haveSize), priv->size);
-    if (priv->haveSize) {
-        *size = priv->size;
+    GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(members->haveSize), members->size);
+    if (members->haveSize) {
+        *size = members->size;
         return TRUE;
     }
 
@@ -771,40 +739,42 @@
 static gboolean webKitWebSrcIsSeekable(GstBaseSrc* baseSrc)
 {
     WebKitWebSrc* src = ""
-
-    GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(src->priv->isSeekable));
-    return src->priv->isSeekable;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(members->isSeekable));
+    return members->isSeekable;
 }
 
 static gboolean webKitWebSrcDoSeek(GstBaseSrc* baseSrc, GstSegment* segment)
 {
+    // This function is mutually exclusive with create(). It's only called when we're transitioning to >=PAUSED and
+    // between flushes. In any case, basesrc holds the STREAM_LOCK, so we know create() is not running.
+    // Also, both webKitWebSrcUnLock() and webKitWebSrcUnLockStop() are guaranteed to be called *before* this function.
+    // [See gst_base_src_perform_seek()].
+    ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || GST_PAD_IS_FLUSHING(baseSrc->srcpad));
+
+    // Except for the initial seek, this function is only called if isSeekable() returns true.
+    ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || webKitWebSrcIsSeekable(baseSrc));
+
     WebKitWebSrc* src = ""
-    WebKitWebSrcPrivate* priv = src->priv;
-    LockHolder locker(priv->responseLock);
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
 
-    GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ")", segment->start, segment->stop);
-    if (priv->readPosition == segment->start && priv->requestedPosition == priv->readPosition && priv->stopPosition == segment->stop) {
-        GST_DEBUG_OBJECT(src, "Seek to current read/end position and no seek pending");
-        return TRUE;
-    }
+    GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ") Position previous to seek: %" G_GUINT64_FORMAT, segment->start, segment->stop, members->readPosition);
 
-    if (priv->wereHeadersReceived && !priv->isSeekable) {
-        GST_WARNING_OBJECT(src, "Not seekable");
-        return FALSE;
-    }
+    // Before attempting to seek, basesrc will call isSeekable(). If isSeekable() is true, a flush will be made and
+    // this function will be called. basesrc still gives us the chance here to return FALSE and cancel the seek.
+    // We cannot afford to return FALSE in this function though unless we're going to fail on purpose, since at this
+    // point we have already been flushed and cancelled the HTTP request that was feeding us data.
 
     if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
-        GST_WARNING_OBJECT(src, "Invalid seek segment");
+        GST_ERROR_OBJECT(src, "Invalid seek segment");
         return FALSE;
     }
 
-    if (priv->haveSize && segment->start >= priv->size)
+    if (members->haveSize && segment->start >= members->size)
         GST_WARNING_OBJECT(src, "Potentially seeking behind end of file, might EOS immediately");
 
-    priv->isSeeking = true;
-    priv->requestedPosition = segment->start;
-    priv->stopPosition = segment->stop;
-    priv->adapterCondition.notifyOne();
+    members->requestedPosition = members->readPosition = segment->start;
+    members->stopPosition = segment->stop;
     return TRUE;
 }
 
@@ -816,8 +786,9 @@
 
     if (GST_QUERY_TYPE(query) == GST_QUERY_URI) {
         gst_query_set_uri(query, priv->originalURI.data());
-        if (!priv->redirectedURI.isNull())
-            gst_query_set_uri_redirection(query, priv->redirectedURI.data());
+        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+        if (!members->redirectedURI.isNull())
+            gst_query_set_uri_redirection(query, members->redirectedURI.data());
         result = TRUE;
     }
 
@@ -838,12 +809,31 @@
 static gboolean webKitWebSrcUnLock(GstBaseSrc* baseSrc)
 {
     WebKitWebSrc* src = ""
-    LockHolder locker(src->priv->responseLock);
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
 
     GST_DEBUG_OBJECT(src, "Unlock");
-    src->priv->isFlushing = true;
-    src->priv->responseCondition.notifyOne();
-    src->priv->adapterCondition.notifyOne();
+    members->isFlushing = true;
+
+    // If we have a network resource request open, we ask the main thread to close it.
+    if (members->resource) {
+        GST_DEBUG_OBJECT(src, "Resource request R%u will be stopped", members->requestNumber);
+        RunLoop::main().dispatch([protector = WTF::ensureGRef(src), resource = WTFMove(members->resource), requestNumber = members->requestNumber] {
+            GST_DEBUG_OBJECT(protector.get(), "Stopping resource request R%u", requestNumber);
+            resource->stop();
+            resource->setClient(nullptr);
+        });
+    }
+    ASSERT(!members->resource);
+
+    if (!src->priv->keepAlive)
+        members->loader = nullptr;
+
+    // Ensure all network callbacks from the old request don't feed data to WebKitWebSrc anymore.
+    members->requestNumber++;
+
+    // Wake up streaming thread.
+    members->responseCondition.notifyOne();
+
     return TRUE;
 }
 
@@ -850,39 +840,14 @@
 static gboolean webKitWebSrcUnLockStop(GstBaseSrc* baseSrc)
 {
     WebKitWebSrc* src = ""
-    LockHolder locker(src->priv->responseLock);
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     GST_DEBUG_OBJECT(src, "Unlock stop");
-    src->priv->isFlushing = false;
+    members->isFlushing = false;
+    webkitWebSrcReset(src, members, ResetType::Soft);
 
     return TRUE;
 }
 
-static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
-{
-    WebKitWebSrc* src = ""
-
-#if GST_CHECK_VERSION(1, 14, 0)
-    GST_DEBUG_OBJECT(src, "%s", gst_state_change_get_name(transition));
-#endif
-    switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_NULL:
-        webKitWebSrcCloseSession(src);
-        break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY: {
-        LockHolder locker(src->priv->responseLock);
-        GST_DEBUG_OBJECT(src, "PAUSED->READY cancelling network requests");
-        src->priv->isFlushing = true;
-        src->priv->responseCondition.notifyOne();
-        src->priv->adapterCondition.notifyOne();
-        break;
-    }
-    default:
-        break;
-    }
-
-    return GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
-}
-
 static bool urlHasSupportedProtocol(const URL& url)
 {
     return url.isValid() && (url.protocolIsInHTTPFamily() || url.protocolIsBlob());
@@ -938,7 +903,6 @@
         return FALSE;
     }
 
-    priv->redirectedURI = CString();
     priv->originalURI = CString();
     if (!uri)
         return TRUE;
@@ -972,16 +936,19 @@
 void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
 {
     ASSERT(player);
-    src->priv->player = player;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    members->player = player;
 }
 
 bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
 {
-    return src->priv->didPassAccessControlCheck;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    return members->didPassAccessControlCheck;
 }
 
-CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
-    : m_src(GST_ELEMENT(src))
+CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request, unsigned requestNumber)
+    : m_requestNumber(requestNumber)
+    , m_src(GST_ELEMENT(src))
     , m_request(WTFMove(request))
 {
 }
@@ -990,6 +957,7 @@
 
 void CachedResourceStreamingClient::checkUpdateBlocksize(unsigned bytesRead)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
     GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
     WebKitWebSrcPrivate* priv = src->priv;
@@ -1026,32 +994,44 @@
 
 void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response, CompletionHandler<void(PolicyChecker::ShouldContinue)>&& completionHandler)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
     WebKitWebSrcPrivate* priv = src->priv;
-    priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+    if (members->requestNumber != m_requestNumber) {
+        completionHandler(PolicyChecker::ShouldContinue::No);
+        return;
+    }
 
-    GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
+    GST_DEBUG_OBJECT(src, "R%u: Received response: %d", m_requestNumber, response.httpStatusCode());
 
+    members->didPassAccessControlCheck = members->resource->didPassAccessControlCheck();
     m_origins.add(SecurityOrigin::create(response.url()));
 
     auto responseURI = response.url().string().utf8();
     if (priv->originalURI != responseURI)
-        priv->redirectedURI = WTFMove(responseURI);
+        members->redirectedURI = WTFMove(responseURI);
 
-    uint64_t length = response.expectedContentLength();
-    if (length > 0 && priv->requestedPosition && response.httpStatusCode() == 206)
-        length += priv->requestedPosition;
+    // length will be zero (unknown) if no Content-Length is provided or the response is compressed with Content-Encoding.
+    uint64_t length = !response.httpHeaderFields().contains(HTTPHeaderName::ContentEncoding) ? response.expectedContentLength() : 0;
+    if (length > 0 && members->requestedPosition && response.httpStatusCode() == 206)
+        length += members->requestedPosition;
 
-    priv->httpHeaders.reset(gst_structure_new_empty("http-headers"));
-    gst_structure_set(priv->httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
+    GUniquePtr<GstStructure> httpHeaders(gst_structure_new_empty("http-headers"));
+
+    gst_structure_set(httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
         "http-status-code", G_TYPE_UINT, response.httpStatusCode(), nullptr);
-    if (!priv->redirectedURI.isNull())
-        gst_structure_set(priv->httpHeaders.get(), "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
+    if (!members->redirectedURI.isNull())
+        gst_structure_set(httpHeaders.get(), "redirection-uri", G_TYPE_STRING, members->redirectedURI.data(), nullptr);
+
+    // Pack request headers in the http-headers structure.
     GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
     for (const auto& header : m_request.httpHeaderFields())
         gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
-    GST_DEBUG_OBJECT(src, "Request headers going downstream: %" GST_PTR_FORMAT, headers.get());
-    gst_structure_set(priv->httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+    GST_DEBUG_OBJECT(src, "R%u: Request headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
+    gst_structure_set(httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+
+    // Pack response headers in the http-headers structure.
     headers.reset(gst_structure_new_empty("response-headers"));
     for (const auto& header : response.httpHeaderFields()) {
         bool ok = false;
@@ -1061,57 +1041,43 @@
         else
             gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
     }
-    auto contentLengthFieldName(httpHeaderNameString(HTTPHeaderName::ContentLength).toString());
-    if (!gst_structure_has_field(headers.get(), contentLengthFieldName.utf8().data()))
-        gst_structure_set(headers.get(), contentLengthFieldName.utf8().data(), G_TYPE_UINT64, static_cast<uint64_t>(length), nullptr);
-    gst_structure_set(priv->httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
-    GST_DEBUG_OBJECT(src, "Response headers going downstream: %" GST_PTR_FORMAT, headers.get());
+    GST_DEBUG_OBJECT(src, "R%u: Response headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
+    gst_structure_set(httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
 
-    priv->httpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, gst_structure_copy(priv->httpHeaders.get())));
+    members->pendingHttpHeadersMessage = adoptGRef(gst_message_new_element(GST_OBJECT_CAST(src), gst_structure_copy(httpHeaders.get())));
+    members->pendingHttpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders.release()));
 
-    auto scopeExit = makeScopeExit([&] {
-        GstStructure* structure = gst_structure_copy(src->priv->httpHeaders.get());
-        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src), structure));
-    });
-
     if (response.httpStatusCode() >= 400) {
-        GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
-        priv->doesHaveEOS = true;
-        webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+        GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received %d HTTP error code", m_requestNumber, response.httpStatusCode()), (nullptr));
+        members->doesHaveEOS = true;
+        members->responseCondition.notifyOne();
         completionHandler(PolicyChecker::ShouldContinue::No);
         return;
     }
 
-    if (priv->requestedPosition) {
+    if (members->requestedPosition) {
         // Seeking ... we expect a 206 == PARTIAL_CONTENT
-        if (response.httpStatusCode() == 200) {
-            // Range request didn't have a ranged response; resetting offset.
-            priv->readPosition = 0;
-        } else if (response.httpStatusCode() != 206) {
+        if (response.httpStatusCode() != 206) {
             // Range request completely failed.
-            GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
-            priv->doesHaveEOS = true;
-            webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+            GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received unexpected %d HTTP status code for range request", m_requestNumber, response.httpStatusCode()), (nullptr));
+            members->doesHaveEOS = true;
+            members->responseCondition.notifyOne();
             completionHandler(PolicyChecker::ShouldContinue::No);
             return;
-        } else {
-            GST_DEBUG_OBJECT(src, "Range request succeeded");
-            priv->isSeeking = false;
-            priv->wasSeeking = true;
         }
+        GST_DEBUG_OBJECT(src, "R%u: Range request succeeded", m_requestNumber);
     }
 
-    priv->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
+    members->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
 
-    GST_DEBUG_OBJECT(src, "Size: %" G_GUINT64_FORMAT ", isSeekable: %s", length, boolForPrinting(priv->isSeekable));
+    GST_DEBUG_OBJECT(src, "R%u: Size: %" G_GUINT64_FORMAT ", isSeekable: %s", m_requestNumber, length, boolForPrinting(members->isSeekable));
     if (length > 0) {
-        if (!priv->haveSize || priv->size != length) {
-            priv->haveSize = true;
-            priv->size = length;
-            priv->isDurationSet = false;
+        if (!members->haveSize || members->size != length) {
+            members->haveSize = true;
+            members->size = length;
         }
     } else
-        priv->haveSize = false;
+        members->haveSize = false;
 
     // Signal to downstream if this is an Icecast stream.
     GRefPtr<GstCaps> caps;
@@ -1123,114 +1089,106 @@
             caps = adoptGRef(gst_caps_new_simple("application/x-icy", "metadata-interval", G_TYPE_INT, metadataInterval, nullptr));
 
             String contentType = response.httpHeaderField(HTTPHeaderName::ContentType);
-            GST_DEBUG_OBJECT(src, "Response ContentType: %s", contentType.utf8().data());
+            GST_DEBUG_OBJECT(src, "R%u: Response ContentType: %s", m_requestNumber, contentType.utf8().data());
             gst_caps_set_simple(caps.get(), "content-type", G_TYPE_STRING, contentType.utf8().data(), nullptr);
         }
     }
-
     if (caps) {
-        GST_DEBUG_OBJECT(src, "Set caps to %" GST_PTR_FORMAT, caps.get());
-        gst_base_src_set_caps(GST_BASE_SRC_CAST(src), caps.get());
+        GST_DEBUG_OBJECT(src, "R%u: Set caps to %" GST_PTR_FORMAT, m_requestNumber, caps.get());
+        members->pendingCaps = WTFMove(caps);
     }
 
-    {
-        LockHolder locker(priv->responseLock);
-        priv->wereHeadersReceived = true;
-        priv->headersCondition.notifyOne();
-    }
+    members->wasResponseReceived = true;
+    members->responseCondition.notifyOne();
+
     completionHandler(PolicyChecker::ShouldContinue::Yes);
 }
 
 void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
-    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
     WebKitWebSrcPrivate* priv = src->priv;
 
-    GST_LOG_OBJECT(src, "Have %d bytes of data", length);
-    LockHolder locker(priv->responseLock);
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
+    if (members->requestNumber != m_requestNumber)
+        return;
+
+    GST_LOG_OBJECT(src, "R%u: Have %d bytes of data", m_requestNumber, length);
+
     // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
     // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
     // sending time from the receiving time, it is better to ignore it.
-    if (!std::isnan(priv->downloadStartTime)) {
-        priv->totalDownloadedBytes += length;
-        double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
-        GST_TRACE_OBJECT(src, "downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
-            , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
+    if (!std::isnan(members->downloadStartTime)) {
+        members->totalDownloadedBytes += length;
+        double timeSinceStart = (WallTime::now() - members->downloadStartTime).seconds();
+        GST_TRACE_OBJECT(src, "R%u: downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", m_requestNumber, members->totalDownloadedBytes, timeSinceStart
+            , timeSinceStart ? members->totalDownloadedBytes / timeSinceStart : 0);
     } else {
-        priv->downloadStartTime = WallTime::now();
-        priv->totalDownloadedBytes = 0;
+        members->downloadStartTime = WallTime::now();
     }
 
-    uint64_t newPosition = priv->readPosition + length;
-    if (LIKELY (priv->requestedPosition == priv->readPosition))
-        priv->requestedPosition = newPosition;
-    priv->readPosition = newPosition;
+    members->readPosition += length;
+    ASSERT(!members->haveSize || members->readPosition <= members->size);
 
-    uint64_t newSize = 0;
-    if (priv->haveSize && (newPosition > priv->size)) {
-        GST_DEBUG_OBJECT(src, "Got position previous estimated content size (%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", newPosition, priv->size);
-        newSize = newPosition;
-    }
-
-    if (newSize) {
-        priv->size = newSize;
-        baseSrc->segment.duration = priv->size;
-        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
-    }
-
     gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src),
-        gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, priv->readPosition, "size", G_TYPE_UINT64, priv->size, nullptr)));
+        gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, members->readPosition, "size", G_TYPE_UINT64, members->size, nullptr)));
 
     checkUpdateBlocksize(length);
 
-    if (!priv->wasResponseReceived)
-        priv->wasResponseReceived = true;
-    priv->responseCondition.notifyOne();
-
-    {
-        LockHolder adapterLocker(priv->adapterLock);
-        GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
-        gst_adapter_push(priv->adapter.get(), buffer);
-        stopLoaderIfNeeded(src);
-        priv->adapterCondition.notifyOne();
-    }
+    GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
+    gst_adapter_push(members->adapter.get(), buffer);
+    stopLoaderIfNeeded(src, members);
+    members->responseCondition.notifyOne();
 }
 
 void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
-    GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
-    src->priv->doesHaveEOS = true;
-    webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    if (members->requestNumber != m_requestNumber)
+        return;
+
+    GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
+    members->doesHaveEOS = true;
+    members->responseCondition.notifyOne();
 }
 
 void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    if (members->requestNumber != m_requestNumber)
+        return;
 
     if (!error.isCancellation()) {
-        GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
-        GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
+        GST_ERROR_OBJECT(src, "R%u: Have failure: %s", m_requestNumber, error.localizedDescription().utf8().data());
+        GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
     }
 
-    src->priv->doesHaveEOS = true;
+    members->doesHaveEOS = true;
+    members->responseCondition.notifyOne();
 }
 
 void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
 {
+    ASSERT(isMainThread());
     WebKitWebSrc* src = ""
-    WebKitWebSrcPrivate* priv = src->priv;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
+    if (members->requestNumber != m_requestNumber)
+        return;
 
-    if (priv->isSeeking && !priv->isFlushing)
-        priv->isSeeking = false;
+    members->doesHaveEOS = true;
+    members->responseCondition.notifyOne();
 }
 
 bool webKitSrcWouldTaintOrigin(WebKitWebSrc* src, const SecurityOrigin& origin)
 {
-    WebKitWebSrcPrivate* priv = src->priv;
+    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
 
-    auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(priv->resource->client());
+    auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(members->resource->client());
     for (auto& responseOrigin : cachedResourceStreamingClient->securityOrigins()) {
         if (!origin.canAccess(*responseOrigin))
             return true;

Modified: releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.h (264080 => 264081)


--- releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.h	2020-07-08 10:07:02 UTC (rev 264080)
+++ releases/WebKitGTK/webkit-2.28/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.h	2020-07-08 10:07:10 UTC (rev 264081)
@@ -39,17 +39,15 @@
 
 #define WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME  "webkit.media-player"
 
-typedef struct _WebKitWebSrc        WebKitWebSrc;
-typedef struct _WebKitWebSrcClass   WebKitWebSrcClass;
-typedef struct _WebKitWebSrcPrivate WebKitWebSrcPrivate;
+struct WebKitWebSrcPrivate;
 
-struct _WebKitWebSrc {
+struct WebKitWebSrc {
     GstPushSrc parent;
 
     WebKitWebSrcPrivate *priv;
 };
 
-struct _WebKitWebSrcClass {
+struct WebKitWebSrcClass {
     GstPushSrcClass parentClass;
 };
 
_______________________________________________
webkit-changes mailing list
webkit-changes@lists.webkit.org
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to