On Fri, Aug 30, 2024 at 01:27:15PM +0200, Thomas Huth wrote:
> On 30/08/2024 09.42, Daniel P. Berrangé wrote:
> > On Fri, Aug 30, 2024 at 09:38:17AM +0200, Thomas Huth wrote:
> > > On 29/08/2024 12.15, Daniel P. Berrangé wrote:
> > > > On Tue, Aug 27, 2024 at 04:24:59PM +0200, Thomas Huth wrote:
> > > > > On 27/08/2024 15.16, Thomas Huth wrote:
> > > > > > On 23/08/2024 09.28, Philippe Mathieu-Daudé wrote:
> > > > > > > Hi,
> > > > > > > 
> > > > > > > On 21/8/24 10:27, Thomas Huth wrote:
> > > > > > > > From: Daniel P. Berrangé <berra...@redhat.com>
> > > > > > > > 
> > > > > > > > Many tests need to access assets stored on remote sites. We 
> > > > > > > > don't want
> > > > > > > > to download these during test execution when run by meson, 
> > > > > > > > since this
> > > > > > > > risks hitting test timeouts when data transfers are slow.
> > > > > > > > 
> > > > > > > > Add support for pre-emptive caching of assets by setting the 
> > > > > > > > env var
> > > > > > > > QEMU_TEST_PRECACHE to point to a timestamp file. When this is 
> > > > > > > > set,
> > > > > > > > instead of running the test, the assets will be downloaded and 
> > > > > > > > saved
> > > > > > > > to the cache, then the timestamp file created.
> > > ...
> > > > > > > 
> > > > > > > When using multiple jobs (-jN) I'm observing some hangs,
> > > > > > > apparently multiple threads trying to download the same file.
> > > > > > > The files are eventually downloaded successfully but it takes
> > > > > > > longer. Should we acquire some exclusive lock somewhere?
> > > > > > 
> > > > > > I haven't seen that yet ... what did you exactly run? "make
> > > > > > check-functional -jN" ? Or "make check-functional-<target> -jN" ?
> > > > > 
> > > > > After applying some of your patches, I think I've run now into this 
> > > > > problem,
> > > > > too: It's because test_aarch64_sbsaref.py and test_aarch64_virt.py 
> > > > > try to
> > > > > download the same asset in parallel 
> > > > > (alpine-standard-3.17.2-aarch64.iso).
> > > > > 
> > > > > Daniel, any ideas how to fix this in the Asset code?
> > > > 
> > > > So when downloading we open a file with a ".download" suffix, write to
> > > > that, and then rename it to the final filename.
> > > > 
> > > > If we have concurrent usage, both will open the same file and try to
> > > > write to it. Assuming both are downloading the same content we would
> > > > probably "get lucky" and have a consistent file at the end, but clearly
> > > > it is bad to rely on luck.
> > > > 
> > > > The lame option is to use NamedTemporaryFile for the teporary file.
> > > > This ensures both processes will write to different temp files, and
> > > > the final rename is atomic. This guarantees safety, but still has
> > > > the double download penalty.
> > > > 
> > > > The serious option is to use fcntl.lockf(..., fcntl.LOCK_EX) on the
> > > > temp file. If we can't acquire the lock then just immediately close
> > > > the temp file (don't delete it) and assume another thread is going to
> > > > finish its download.
> > > > 
> > > > On windows  we'll need msvcrt.locking(..., msvcrt.LK_WLCK, ...)
> > > > instead of fcntl.
> > > 
> > > While looking for portable solutions, I noticed that newer versions
> > > of Python have a "x" mode for creating files only if they do not
> > > exist yet. So I think something like this could be a solution:
> > > 
> > > @@ -71,17 +72,26 @@ def fetch(self):
> > >           tmp_cache_file = self.cache_file.with_suffix(".download")
> > >           try:
> > > -            resp = urllib.request.urlopen(self.url)
> > > +            with tmp_cache_file.open("xb") as dst:
> > > +                with urllib.request.urlopen(self.url) as resp:
> > > +                    copyfileobj(resp, dst)
> > > +        except FileExistsError:
> > > +            # Another thread already seems to download this asset,
> > > +            # so wait until it is done
> > > +            self.log.debug("%s already exists, waiting for other thread 
> > > to finish...",
> > > +                           tmp_cache_file)
> > > +            i = 0
> > > +            while i < 600 and os.path.exists(tmp_cache_file):
> > > +                sleep(1)
> > > +                i += 1
> > > +            if os.path.exists(self.cache_file):
> > > +                return str(self.cache_file)
> > > +            raise
> > >           except Exception as e:
> > >               self.log.error("Unable to download %s: %s", self.url, e)
> > > -            raise
> > > -
> > > -        try:
> > > -            with tmp_cache_file.open("wb+") as dst:
> > > -                copyfileobj(resp, dst)
> > > -        except:
> > >               tmp_cache_file.unlink()
> > >               raise
> > > +
> > >           try:
> > >               # Set these just for informational purposes
> > >               os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",
> > > 
> > > What do you think, does it look reasonable?
> > 
> > The main risk with this, as opposed to fcntl locking, is that it is not
> > crash-safe. If a download is interrupted, subsequent cache runs will
> > wait for a process that doesn't exist to finish downloading and then
> > raise an exception, requiring manual user cleanup of the partial
> > download.
> > 
> > Perhaps if we see the tmp_cache_file, and it doesn't change in size
> > after N seconds, we could force unlink it, and create a new download,
> > so we gracefully recover ?
> 
> Sounds like a plan ... does this look acceptable:
> 
> @@ -70,18 +71,52 @@ def fetch(self):
>          self.log.info("Downloading %s to %s...", self.url, self.cache_file)
>          tmp_cache_file = self.cache_file.with_suffix(".download")
> 
> -        try:
> -            resp = urllib.request.urlopen(self.url)
> -        except Exception as e:
> -            self.log.error("Unable to download %s: %s", self.url, e)
> -            raise
> +        for retries in range(3):
> +            try:
> +                with tmp_cache_file.open("xb") as dst:
> +                    with urllib.request.urlopen(self.url) as resp:
> +                        copyfileobj(resp, dst)
> +                break
> +            except FileExistsError:
> +                # Another thread already seems to download this asset,
> +                # so wait until it is done
> +                self.log.debug("%s already exists, "
> +                               "waiting for other thread to finish...",
> +                               tmp_cache_file)
> +                try:
> +                    current_size = tmp_cache_file.stat().st_size
> +                    new_size = current_size
> +                except:
> +                    if os.path.exists(self.cache_file):
> +                        return str(self.cache_file)
> +                    raise
> +                waittime = lastchange = 600
> +                while waittime > 0:
> +                    sleep(1)
> +                    waittime -= 1
> +                    try:
> +                        new_size = tmp_cache_file.stat().st_size
> +                    except:
> +                        if os.path.exists(self.cache_file):
> +                            return str(self.cache_file)
> +                        raise
> +                    if new_size != current_size:
> +                        lastchange = waittime
> +                        current_size = new_size
> +                    elif lastchange - waittime > 90:
> +                       self.log.debug("%s seems to be stale ... "
> +                                      "deleting and retrying download...",
> +                                      tmp_cache_file)
> +                       tmp_cache_file.unlink()
> +                       break
> +                if waittime > 0:
> +                    continue
> +                raise
> +            except Exception as e:
> +                self.log.error("Unable to download %s: %s", self.url, e)
> +                tmp_cache_file.unlink()
> +                raise
> 
> -        try:
> -            with tmp_cache_file.open("wb+") as dst:
> -                copyfileobj(resp, dst)
> -        except:
> -            tmp_cache_file.unlink()
> -            raise
>          try:
>              # Set these just for informational purposes
>              os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",
> 
> ?
> 
> I tried it with a stale file in my cache, and it seems to work - after 90
> seconds, one of the threads is properly trying to redownload the file.

Yeah, I think that'll work ok 

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|


Reply via email to