This is an automated email from the git hooks/post-receive script. Git pushed a commit to branch master in repository ffmpeg.
commit 56de70a2e624b7febe7877ee261f699000eea66a Author: Niklas Haas <[email protected]> AuthorDate: Mon Jan 26 19:46:49 2026 +0100 Commit: Niklas Haas <[email protected]> CommitDate: Thu Jun 4 17:48:12 2026 +0200 avformat: add shared concurrent block cache protocol This adds a new protocol shared:URI which is distinct from the existing `cache:` in that it is explicity designed to be thread-safe and cross-process, enabling multiple ffmpeg processes (or multiple ffmpeg decoders within the same process) to share a single cache file, for e.g. a remote HTTP stream. As such, it uses a radically different internal design. To facilitate zero-knowledge cross-process interoperability, the cache file itself is just a memory-mapped representation of the underlying file data, which has the side benefit that the resulting cache file will contain a working copy of the streamed file (assuming the stream was read to completion). To keep track of which regions are cached and which are not, we use a secondary file that contains a minimal header along with a static bytemap of blocks within the file. This secondary file is also used to store metadata such as the filesize, if known, as well as marking "failed" blocks. Both files can grow dynamically in order to accommodate larger/growing files, and can be atomically updated (through the use of shared space maps). I have extensively checked the space map initalization and update code for race conditions, and I believe the current design to be solid. That said, it is the user's responsibility to some extent to ensure that the same URI is not used for different streams, as we rely on the URI to uniquely identify the cache files. That said, we use a cryptographic hash with sufficient collision resistance to protect against possible abuse. The lack of any implicit default on `-cache_dir` also means that `shared:` can't be enabled via URL injection to possibly access random files on the disk (or intentionally leak content from other streams with similar URIs, even if the cryptograhic hash function is broken). --- configure | 1 + doc/protocols.texi | 52 +++ libavformat/Makefile | 1 + libavformat/protocols.c | 1 + libavformat/shared.c | 826 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 881 insertions(+) diff --git a/configure b/configure index 7cf06d404e..4e6bce270e 100755 --- a/configure +++ b/configure @@ -4121,6 +4121,7 @@ schannel_conflict="openssl gnutls libtls mbedtls" sctp_protocol_deps="struct_sctp_event_subscribe struct_msghdr_msg_flags" sctp_protocol_select="network" securetransport_conflict="openssl gnutls libtls mbedtls" +shared_protocol_deps="mmap stdatomic unistd_h" srtp_protocol_select="rtp_protocol srtp" tcp_protocol_select="network" tls_protocol_deps_any="gnutls openssl schannel securetransport libtls mbedtls" diff --git a/doc/protocols.texi b/doc/protocols.texi index 81485bea12..1fbf2ef873 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -169,6 +169,7 @@ Read angle 2 of playlist 4 from BluRay mounted to /mnt/bluray, start from chapte -playlist 4 -angle 2 -chapter 2 bluray:/mnt/bluray @end example +@anchor{cache} @section cache Caching wrapper for input stream. @@ -1570,6 +1571,57 @@ If set to any value, listen for an incoming connection. Outgoing connection is d Set the maximum number of streams. By default no limit is set. @end table +@section shared + +Thread-safe, persistent, cross-process cache wrapper for input streams. Caches +the input stream to a file in the specified directory. This is similar to +@ref{cache}, but with a persistent cache on disk that may be shared between +multiple processes or demuxers, even concurrently. + +The filename of the cache is obtained by hashing the input URL, so there is +some risk of collision if the same URL is used for different content (e.g. +as a result of extra POST data being used to select the stream ID). To avoid +this, make sure to only use this protocol for URLs that uniquely identify +the content; otherwise you may get a garbled mix of sources. + +The accepted options are: +@table @option + +@item cache_dir +Path to the directory where cache files are stored. This option is required. + +@item block_shift +Shift factor (log2) of the block size used for internal reads/writes. Defaults +to 15, i.e. 32KB blocks. If this does not match the value that was specified +when an existing cache file was created, the previously specified value will +be used instead. + +@item read_only +If true, use the shared cache for reads but don't write any new blocks to it. +Default is false. Note that even with this option enabled, the cache file will +be initialized if it does not already exist. + +@item cache_timeout +If set to a nonzero value, specifies the maximum time (in microseconds) to wait +for data to become available, if another process is currently trying to fetch +and cache the same block at the same time. If this timeout elapses, it's +assumed that the other process may have gotten stuck or died in the meantime. + +If set to zero, no waiting is done and all processes will immediately race +to try and fetch the same missing blocks themselves. Defaults to 0. + +@item retry_errors +If true (the default), transient read errors from the underlying input stream +are ignored and retried again. If false, any blocks that previously failed +being read from will be treated as permanently inaccessible. + +@end table + +URL Syntax is +@example +shared:@var{URL} +@end example + @section srt Haivision Secure Reliable Transport Protocol via libsrt. diff --git a/libavformat/Makefile b/libavformat/Makefile index 33525369a4..0db0c7c2a9 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -712,6 +712,7 @@ OBJS-$(CONFIG_RTMPTE_PROTOCOL) += rtmpproto.o rtmpdigest.o rtmppkt.o OBJS-$(CONFIG_RTMPTS_PROTOCOL) += rtmpproto.o rtmpdigest.o rtmppkt.o OBJS-$(CONFIG_RTP_PROTOCOL) += rtpproto.o ip.o OBJS-$(CONFIG_SCTP_PROTOCOL) += sctp.o +OBJS-$(CONFIG_SHARED_PROTOCOL) += shared.o OBJS-$(CONFIG_SRTP_PROTOCOL) += srtpproto.o srtp.o OBJS-$(CONFIG_SUBFILE_PROTOCOL) += subfile.o OBJS-$(CONFIG_TEE_PROTOCOL) += teeproto.o tee_common.o diff --git a/libavformat/protocols.c b/libavformat/protocols.c index 207b6bf8d9..257b41970a 100644 --- a/libavformat/protocols.c +++ b/libavformat/protocols.c @@ -55,6 +55,7 @@ extern const URLProtocol ff_rtmpt_protocol; extern const URLProtocol ff_rtmpte_protocol; extern const URLProtocol ff_rtmpts_protocol; extern const URLProtocol ff_rtp_protocol; +extern const URLProtocol ff_shared_protocol; extern const URLProtocol ff_sctp_protocol; extern const URLProtocol ff_srtp_protocol; extern const URLProtocol ff_subfile_protocol; diff --git a/libavformat/shared.c b/libavformat/shared.c new file mode 100644 index 0000000000..8cb6f3040c --- /dev/null +++ b/libavformat/shared.c @@ -0,0 +1,826 @@ +/* + * Shared file cache protocol. + * Copyright (c) 2026 Niklas Haas + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Based on cache.c by Michael Niedermayer + */ + +#include "libavutil/avassert.h" +#include "libavutil/avstring.h" +#include "libavutil/hash.h" +#include "libavutil/file_open.h" +#include "libavutil/mem.h" +#include "libavutil/opt.h" +#include "libavutil/time.h" + +#include "url.h" + +#include <errno.h> +#include <fcntl.h> +#include <stdatomic.h> +#include <sys/file.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <unistd.h> + +/** + * This hash should be resistant against collision attacks, so that an + * attacker could not generate e.g. two different URIs that map to the same + * cache file. This requires at least 64 bits of collision resistance in + * practice (i.e. 128 bits = 16 bytes of hash size). However, we can be + * conservative by computing e.g. a 256 bit hash and storing it inside the + * file header for verification. + * + * Note that due to the way we use atomics, we should avoid zero bytes in + * the resulting hash; hence we tweak the input slightly to avoid this. + * The resulting loss in hash strength is negligible, since 32 bytes is + * already much more than needed. + */ +#define HASH_METHOD "SHA512/256" +#define HASH_SIZE 32 + +static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri) +{ + struct AVHashContext *ctx = NULL; + int ret = av_hash_alloc(&ctx, HASH_METHOD); + if (ret < 0) + return ret; + + av_assert0(av_hash_get_size(ctx) == HASH_SIZE); + av_hash_init(ctx); + av_hash_update(ctx, (const uint8_t *) uri, strlen(uri)); + av_hash_final(ctx, hash); + av_hash_freep(&ctx); + + for (int i = 0; i < HASH_SIZE; i++) + hash[i] = hash[i] ? hash[i] : ~hash[i]; /* prevent zero bytes */ + return 0; +} + +#define HEADER_MAGIC MKTAG(u'\xFF', 'S', 'h', '$') +#define HEADER_VERSION 1 + +enum BlockState { + BLOCK_NONE, + BLOCK_CACHED, ///< block was cached successfully + BLOCK_PENDING, ///< a thread is currently trying to write this block + BLOCK_FAILED, ///< the underlying I/O source failed to read this block +}; + +typedef struct Block { + atomic_uchar state; /* enum BlockState */ +} Block; + +typedef struct Spacemap { + atomic_uint header_magic; + atomic_ushort version; + atomic_ushort block_shift; + atomic_ullong filesize; /* byte offset of true EOF, or 0 if unknown */ + atomic_uchar hash[HASH_SIZE]; /* hash of resource URI / filename */ + char reserved[80]; + + Block blocks[]; +} Spacemap; + +/* Set to value iff the current value is unset (zero) */ +#define DEF_SET_ONCE(ctype, atype) \ + static int set_once_##atype(atomic_##atype *const ptr, const ctype value) \ + { \ + ctype prev = 0; \ + av_assert1(value != 0); \ + if (atomic_compare_exchange_strong_explicit( \ + ptr, &prev, value, memory_order_release, memory_order_relaxed)) \ + return 1; \ + else if (prev == value) \ + return 0; \ + else \ + return AVERROR(EINVAL); \ + } + +DEF_SET_ONCE(unsigned char, uchar) +DEF_SET_ONCE(unsigned int, uint) +DEF_SET_ONCE(unsigned short, ushort) +DEF_SET_ONCE(unsigned long long, ullong) + +typedef struct SharedContext { + AVClass *class; + URLContext *inner; + int64_t inner_pos; + + /* options */ + char *cache_dir; + int block_shift; ///< requested shift; may disagree with actual + int read_only; + int64_t timeout; + int retry_errors; + + /* misc state */ + int64_t pos; ///< current logical position + uint8_t *tmp_buf; + int block_size; + int write_err; ///< write error occurred + + /* cache file */ + uint8_t *cache_data; ///< optional mmap of the cache file + char *cache_path; + off_t cache_size; ///< size of mapped memory region (for munmap) + int fd; + + /* space map */ + Spacemap *spacemap; + char *map_path; + off_t map_size; + int mapfd; + + /* statistics */ + int64_t nb_hit; + int64_t nb_miss; +} SharedContext; + +static int shared_close(URLContext *h) +{ + SharedContext *s = h->priv_data; + + ffurl_close(s->inner); + if (s->cache_data) + munmap(s->cache_data, s->cache_size); + if (s->spacemap) + munmap(s->spacemap, s->map_size); + if (s->fd != -1) + close(s->fd); + if (s->mapfd != -1) + close(s->mapfd); + av_freep(&s->cache_path); + av_freep(&s->map_path); + av_freep(&s->tmp_buf); + + av_log(h, AV_LOG_DEBUG, "Cache statistics: %"PRId64" hits, %"PRId64" misses\n", + s->nb_hit, s->nb_miss); + return 0; +} + +static int cache_map(URLContext *h, int64_t filesize); +static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE]); +static int spacemap_grow(URLContext *h, int64_t block); + +static int64_t get_filesize(URLContext *h) +{ + SharedContext *s = h->priv_data; + return atomic_load_explicit(&s->spacemap->filesize, memory_order_relaxed); +} + +static int set_filesize(URLContext *h, int64_t new_size) +{ + SharedContext *s = h->priv_data; + int ret; + + if (!new_size) + return 0; + + ret = set_once_ullong(&s->spacemap->filesize, new_size); + if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Cached file size mismatch, expected: " + "%"PRId64", got: %"PRIu64"!\n", new_size, + (uint64_t) atomic_load(&s->spacemap->filesize)); + return ret; + } else if (ret) { + /* Opportunistically map the file; this also sets the correct filesize. + * Ignore errors as this is not critical to the cache logic. */ + cache_map(h, new_size); + } + + return ret; +} + +static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options) +{ + SharedContext *s = h->priv_data; + int ret; + + if (!s->cache_dir || !s->cache_dir[0]) { + av_log(h, AV_LOG_ERROR, "Missing path for shared cache! Specify a " + "directory using the -cache_dir option.\n"); + return AVERROR(EINVAL); + } + + s->fd = s->mapfd = -1; /* Set these early for shared_close() failure path */ + + /* Open underlying protocol */ + av_strstart(arg, "shared:", &arg); + ret = ffurl_open_whitelist(&s->inner, arg, flags, &h->interrupt_callback, + options, h->protocol_whitelist, h->protocol_blacklist, h); + + if (ret < 0) + goto fail; + + uint8_t hash[HASH_SIZE]; + ret = hash_uri(hash, arg); + if (ret < 0) + goto fail; + + /* 128 bits is enough for collision resistance; we already store the full + * hash inside the header for verification */ + char filename[2 * 16 + 1]; + for (int i = 0; i < FF_ARRAY_ELEMS(filename) / 2; i++) + sprintf(&filename[i * 2], "%02X", hash[i]); + s->cache_path = av_asprintf("%s/%s.cache", s->cache_dir, filename); + s->map_path = av_asprintf("%s/%s.spacemap", s->cache_dir, filename); + if (!s->cache_path || !s->map_path) { + ret = AVERROR(ENOMEM); + goto fail; + } + + av_log(h, AV_LOG_VERBOSE, "Opening cache file '%s' for URI: '%s'\n", + s->cache_path, s->inner->filename); + + s->fd = avpriv_open(s->cache_path, O_RDWR | O_CREAT, 0660); + s->mapfd = avpriv_open(s->map_path, O_RDWR | O_CREAT, 0660); + if (s->fd < 0 || s->mapfd < 0) { + ret = AVERROR(errno); + av_log(h, AV_LOG_ERROR, "Failed to open '%s': %s\n", + s->fd < 0 ? s->cache_path : s->map_path, av_err2str(ret)); + goto fail; + } + + ret = spacemap_init(h, hash); + if (ret < 0) + goto fail; + + s->block_size = 1 << atomic_load(&s->spacemap->block_shift); + + int64_t filesize = get_filesize(h); + if (!filesize) { + /* Filesize is not yet known, try to get it from the underlying URL */ + filesize = ffurl_size(s->inner); + if (filesize < 0 && filesize != AVERROR(ENOSYS)) { + ret = (int) filesize; + goto fail; + } else if (filesize > 0) + set_filesize(h, filesize); + } + + if (filesize > 0) { + int64_t last_pos = filesize - 1; + int64_t last_block = last_pos >> atomic_load(&s->spacemap->block_shift); + ret = spacemap_grow(h, last_block); + if (ret < 0) + goto fail; + + /* If filesize is known, we can directly mmap() the cache file */ + ret = cache_map(h, filesize); + if (ret < 0) { + av_log(h, AV_LOG_WARNING, "Failed to map cache file: %s. Falling " + "back to normal read/write\n", av_err2str(ret)); + ret = 0; + } + } + + if (!s->cache_data) { + /* Temporary buffer needed for pread/pwrite() fallback */ + s->tmp_buf = av_malloc(s->block_size); + if (!s->tmp_buf) { + ret = AVERROR(ENOMEM); + goto fail; + } + } + + h->max_packet_size = s->block_size; + h->min_packet_size = s->block_size; + +fail: + if (ret < 0) + shared_close(h); + return ret; +} + +static int cache_map(URLContext *h, int64_t filesize) +{ + SharedContext *s = h->priv_data; + if (s->cache_size >= filesize || filesize > SIZE_MAX) + return 0; + + if (s->cache_data) { + munmap(s->cache_data, s->cache_size); + s->cache_data = NULL; + s->cache_size = 0; + } + + struct stat st; + int ret = fstat(s->fd, &st); + if (ret < 0) + return AVERROR(errno); + + if (st.st_size != filesize) { + /* Ensure the file size is correct before mapping; this can happen if + * another process wrote the correct filesize to the header but + * crashed right before actually successfully resizing the file. */ + ret = ftruncate(s->fd, filesize); + if (ret < 0) + return AVERROR(errno); + } + + s->cache_data = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, s->fd, 0); + if (s->cache_data == MAP_FAILED) { + s->cache_data = NULL; + return AVERROR(errno); + } + + s->cache_size = filesize; + return 0; +} + +static int spacemap_remap(URLContext *h, size_t map_size) +{ + SharedContext *s = h->priv_data; + struct flock fl = { .l_type = F_WRLCK }; + int ret, did_grow = 0; + if (map_size <= s->map_size) + return 0; + + /* Opportunistically get current filesize before attempting to lock */ + struct stat st; + ret = fstat(s->mapfd, &st); + if (ret < 0) { + ret = AVERROR(errno); + goto fail; + } + + if (st.st_size >= map_size) + goto skip_resize; + + /* Lock the spacemap to ensure nobody else is currently resizing it */ + ret = fcntl(s->mapfd, F_SETLKW, &fl); + if (ret < 0) { + ret = AVERROR(errno); + goto fail; + } + fl.l_type = F_UNLCK; + + /* Refresh filesize after acquiring the lock */ + ret = fstat(s->mapfd, &st); + if (ret < 0) { + ret = AVERROR(errno); + goto fail; + } + + if (st.st_size >= map_size) + goto skip_resize; + + ret = ftruncate(s->mapfd, map_size); + if (ret < 0) { + ret = AVERROR(errno); + goto fail; + } + st.st_size = map_size; + did_grow = 1; + +skip_resize: + if (s->spacemap) + munmap(s->spacemap, s->map_size); + s->map_size = st.st_size; + s->spacemap = mmap(NULL, s->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, s->mapfd, 0); + if (s->spacemap == MAP_FAILED) { + s->spacemap = NULL; /* for munmap check */ + s->map_size = 0; + ret = AVERROR(errno); + goto fail; + } + + /* fl.l_type is set to F_UNLCK only after successful lock */ + if (fl.l_type == F_UNLCK) + fcntl(s->mapfd, F_SETLK, &fl); + + return did_grow; + +fail: + if (fl.l_type == F_UNLCK) + fcntl(s->mapfd, F_SETLK, &fl); + av_log(h, AV_LOG_ERROR, "Failed to resize space map: %s\n", av_err2str(ret)); + return ret; +} + +static int spacemap_grow(URLContext *h, int64_t block) +{ + SharedContext *s = h->priv_data; + int64_t num_blocks = block + 1; + size_t map_bytes = sizeof(Spacemap) + num_blocks * sizeof(Block); + + /* When streaming files without known size, round up the number of blocks + * to the nearest multiple of the block size to reduce the rate of resizes */ + if (!get_filesize(h)) { + av_assert0(s->block_size > 0); + map_bytes = FFALIGN(map_bytes, (int64_t) s->block_size); + } + + if (map_bytes < num_blocks) + return AVERROR(EINVAL); /* overflow */ + + const off_t old_size = s->map_size; + int ret = spacemap_remap(h, map_bytes); + if (ret < 0) + return ret; + + /* Report new size after successful grow */ + if (s->map_size > old_size) { + num_blocks = (s->map_size - sizeof(Spacemap)) / sizeof(Block); + av_log(h, AV_LOG_DEBUG, + "%s %zu bytes, capacity: %"PRId64" blocks = %zu MB\n", + ret ? "Resized spacemap to" : "Mapped spacemap with", + (size_t) s->map_size, num_blocks, + (num_blocks * (int64_t) s->block_size) >> 20); + } + return 0; +} + +static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE]) +{ + SharedContext *s = h->priv_data; + int ret; + + ret = spacemap_remap(h, sizeof(Spacemap)); + if (ret < 0) + return ret; + + if ((ret = set_once_uint(&s->spacemap->header_magic, HEADER_MAGIC)) < 0 || + (ret = set_once_ushort(&s->spacemap->version, HEADER_VERSION)) < 0) + { + av_log(h, AV_LOG_ERROR, "Shared cache spacemap header mismatch!\n"); + av_log(h, AV_LOG_ERROR, " Expected magic: 0x%X, version: %d\n", + HEADER_MAGIC, HEADER_VERSION); + av_log(h, AV_LOG_ERROR, " Got magic: 0x%X, version: %d\n", + atomic_load(&s->spacemap->header_magic), + atomic_load(&s->spacemap->version)); + return ret; + } + + ret = set_once_ushort(&s->spacemap->block_shift, s->block_shift); + if (ret < 0) { + const int shift = atomic_load(&s->spacemap->block_shift); + av_log(h, AV_LOG_WARNING, "Shared cache uses block shift %d, " + "but requested block shift is %d.\n", shift, s->block_shift); + if (shift < 9 || shift > 30) { + av_log(h, AV_LOG_ERROR, "Invalid block shift %d in cache file!\n", shift); + return AVERROR(EINVAL); + } + } + + for (int i = 0; i < HASH_SIZE; i++) { + ret = set_once_uchar(&s->spacemap->hash[i], hash[i]); + if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Shared cache spacemap hash mismatch!\n"); + av_log(h, AV_LOG_ERROR, " Expected hash: "); + for (int j = 0; j < 32; j++) + av_log(h, AV_LOG_ERROR, "%02X", hash[j]); + av_log(h, AV_LOG_ERROR, "\n Got hash: "); + for (int j = 0; j < 32; j++) + av_log(h, AV_LOG_ERROR, "%02X", atomic_load(&s->spacemap->hash[j])); + av_log(h, AV_LOG_ERROR, "\n"); + return ret; + } + } + + if (ret) /* set_once() return 1 if this is the first time setting the value */ + av_log(h, AV_LOG_DEBUG, "Initialized new cache spacemap.\n"); + + return ret; +} + +static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset) +{ + if (s->cache_data) { + av_assert1(offset + size <= s->cache_size); + memcpy(buf, s->cache_data + offset, size); + return 0; + } + + while (size) { + ssize_t ret = pread(s->fd, buf, size, offset); + if (ret <= 0) + return ret ? AVERROR(errno) : AVERROR(EIO); + buf += ret; + offset += ret; + size -= ret; + } + + return 0; +} + +static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset) +{ + if (s->cache_data) { + av_assert1(offset + size <= s->cache_size); + memcpy(s->cache_data + offset, buf, size); + return 0; + } + + while (size) { + ssize_t ret = pwrite(s->fd, buf, size, offset); + if (ret <= 0) + return ret ? AVERROR(errno) : AVERROR(EIO); + buf += ret; + offset += ret; + size -= ret; + } + + return 0; +} + +static size_t clamp_size(URLContext *h, size_t size, int64_t pos) +{ + const int64_t filesize = get_filesize(h); + if (!filesize) + return size; + else if (pos > filesize) + return 0; + else + return FFMIN(filesize - pos, size); +} + +static int shared_read(URLContext *h, unsigned char *buf, int size) +{ + SharedContext *s = h->priv_data; + uint8_t *tmp; + int ret; + + if (size <= 0) + return 0; + + size = clamp_size(h, size, s->pos); + if (size <= 0) + return AVERROR_EOF; + + const int shift = atomic_load_explicit(&s->spacemap->block_shift, memory_order_relaxed); + const int64_t block_id = s->pos >> shift; + const int64_t offset = s->pos & (s->block_size - 1); + const int64_t block_pos = block_id * s->block_size; + int block_size = clamp_size(h, s->block_size, block_pos); + ret = spacemap_grow(h, block_id); + if (ret < 0) + return ret; + + Block *const block = &s->spacemap->blocks[block_id]; + unsigned char state = atomic_load_explicit(&block->state, memory_order_acquire); + int64_t pending_since = 0; + +retry: + switch (state) { + case BLOCK_CACHED: + size = FFMIN(size, s->block_size - offset); + size = clamp_size(h, size, s->pos); /* filesize may have changed */ + ret = read_cache(s, buf, size, s->pos); + if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Failed to read from cache file: %s\n", av_err2str(ret)); + return ret; + } + + s->nb_hit++; + s->pos += size; + return size; + + case BLOCK_FAILED: + if (!s->retry_errors) + return AVERROR(EIO); + /* fall through */ + case BLOCK_NONE: + if (s->read_only) + break; /* don't mark block as pending */ + if (atomic_compare_exchange_weak_explicit(&block->state, &state, + BLOCK_PENDING, + memory_order_acquire, + memory_order_acquire)) + { + /* Acquired pending state, proceed to fetch the block */ + state = BLOCK_PENDING; + break; + } + /* CAS failed, another thread changed the state; reload it */ + goto retry; + + case BLOCK_PENDING: + /* Another thread is busy fetching this block, wait for it to finish */ + if (!s->timeout) { + break; /* no timeout requested, immediately race to fetch block */ + } else if (pending_since) { + int64_t new = av_gettime_relative(); + if (new - pending_since >= s->timeout) + break; /* timeout expired, try to fetch the block ourselves */ + } else { + pending_since = av_gettime_relative(); + } + + /* Make sure we try a few times before giving up */ + av_usleep(s->timeout >> 4); + state = atomic_load_explicit(&block->state, memory_order_acquire); + goto retry; + } + + /* Cache miss, fetch this block from underlying protocol */ + s->nb_miss++; + + const int read_only = s->read_only || s->write_err; + int64_t inner_pos = read_only ? s->pos : block_pos; + if (s->inner_pos != inner_pos) { + inner_pos = ffurl_seek(s->inner, inner_pos, SEEK_SET); + if (inner_pos < 0) { + av_log(h, AV_LOG_ERROR, "Failed to seek underlying protocol: %s\n", + av_err2str(inner_pos)); + if (!read_only) { + /* Release pending state to avoid stalling other threads. Don't + * mark this as failed, since the seek error may be unrelated to + * the block and should probably be tried again. */ + atomic_compare_exchange_strong_explicit(&block->state, &state, + BLOCK_NONE, + memory_order_relaxed, + memory_order_relaxed); + } + return inner_pos; + } + + av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", inner_pos); + s->inner_pos = inner_pos; + } + + if (read_only) { + /* Directly defer to the underlying protocol */ + ret = ffurl_read(s->inner, buf, size); + if (ret >= 0) + s->pos = s->inner_pos = inner_pos + ret; + return ret; + } + + int write_back = 1; + if (s->cache_data) { + /* Read directly into memory mapped cache file */ + tmp = s->cache_data + block_pos; + write_back = 0; + } else if (size >= block_size && !offset) { + /* Read directly into output buffer if aligned and large enough */ + tmp = buf; + } else { + /* Read into temporary buffer and copy later */ + tmp = s->tmp_buf; + } + + /* Try and fetch the entire block */ + av_assert0(inner_pos == block_pos); + int bytes_read = 0; + while (bytes_read < block_size) { + ret = ffurl_read(s->inner, &tmp[bytes_read], block_size - bytes_read); + if (!ret || ret == AVERROR_EOF) + break; + else if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Failed to read block 0x%"PRIx64": %s\n", + block_id, av_err2str(ret)); + /* Try to mark block as failed; ignore errors - any mismatch + * here will mean that either another thread already marked it + * as failed, or successfully cached it in the meantime */ + atomic_compare_exchange_strong_explicit(&block->state, &state, + BLOCK_FAILED, + memory_order_relaxed, + memory_order_relaxed); + return ret; + } + + bytes_read += ret; + s->inner_pos += ret; + } + + if (bytes_read < block_size) { + /* Learned location of true EOF, update filesize */ + ret = set_filesize(h, inner_pos + bytes_read); + if (ret < 0) + return ret; + } + + if (bytes_read > 0) { + ret = write_back ? write_cache(s, tmp, bytes_read, block_pos) : 0; + if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Failed to write to cache file: %s\n", + av_err2str(ret)); + s->write_err = 1; + /* Mark as NONE, not FAILED, since the block itself is fine - + * just absent from the cache. */ + atomic_compare_exchange_strong_explicit(&block->state, &state, + BLOCK_NONE, + memory_order_relaxed, + memory_order_relaxed); + } else { + av_log(h, AV_LOG_TRACE, "Cached %d bytes to block 0x%"PRIx64" at " + "offset 0x%"PRIx64"\n", bytes_read, block_id, block_pos); + atomic_store_explicit(&block->state, BLOCK_CACHED, memory_order_release); + } + } else { + return AVERROR_EOF; + } + + size = FFMIN(bytes_read - offset, size); + av_assert0(size > 0); + if (tmp != buf) + memcpy(buf, &tmp[offset], size); + s->pos += size; + return size; +} + +static int64_t shared_seek(URLContext *h, int64_t pos, int whence) +{ + SharedContext *s = h->priv_data; + const int64_t filesize = get_filesize(h); + int64_t res; + + switch (whence) { + case AVSEEK_SIZE: + if (filesize) + return filesize; + res = ffurl_seek(s->inner, pos, whence); + if (res > 0) + set_filesize(h, res); + return res; + case SEEK_SET: + break; + case SEEK_CUR: + pos += s->pos; + break; + case SEEK_END: + if (filesize) { + pos += filesize; + break; + } + /* Defer to underlying protocol if filesize is unknown */ + res = ffurl_seek(s->inner, pos, whence); + if (res < 0) + return res; + set_filesize(h, res - pos); /* Opportunistically update known filesize */ + av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", res); + return s->pos = s->inner_pos = res; + default: + return AVERROR(EINVAL); + } + + if (pos < 0) + return AVERROR(EINVAL); + + av_log(h, AV_LOG_DEBUG, "Virtual seek to 0x%"PRIx64"\n", pos); + return s->pos = pos; +} + +static int shared_get_file_handle(URLContext *h) +{ + SharedContext *s = h->priv_data; + return ffurl_get_file_handle(s->inner); +} + +static int shared_get_short_seek(URLContext *h) +{ + SharedContext *s = h->priv_data; + int ret = ffurl_get_short_seek(s->inner); + if (ret < 0) + return ret; + return FFMAX(ret, s->block_size); +} + +#define OFFSET(x) offsetof(SharedContext, x) +#define D AV_OPT_FLAG_DECODING_PARAM + +static const AVOption options[] = { + { "cache_dir", "Directory path for shared file cache", OFFSET(cache_dir), AV_OPT_TYPE_STRING, {.str = NULL}, .flags = D }, + { "block_shift", "Set the base 2 logarithm of the block size", OFFSET(block_shift), AV_OPT_TYPE_INT, {.i64 = 15}, 9, 30, .flags = D }, + { "read_only", "Don't write data to the cache, only read from it", OFFSET(read_only), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D }, + { "cache_timeout", "Time in us to wait before re-fetching pending blocks", OFFSET(timeout), AV_OPT_TYPE_INT64, {.i64 = 0}, 0, INT64_MAX, .flags = D }, + { "retry_errors", "Re-request blocks even if they previously failed", OFFSET(retry_errors), AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, .flags = D }, + {0}, +}; + +static const AVClass shared_context_class = { + .class_name = "shared", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; + +const URLProtocol ff_shared_protocol = { + .name = "shared", + .url_open2 = shared_open, + .url_read = shared_read, + .url_seek = shared_seek, + .url_close = shared_close, + .url_get_file_handle = shared_get_file_handle, + .url_get_short_seek = shared_get_short_seek, + .priv_data_size = sizeof(SharedContext), + .priv_data_class = &shared_context_class, +}; _______________________________________________ ffmpeg-cvslog mailing list -- [email protected] To unsubscribe send an email to [email protected]
