X-Git-Url: http://git.scottworley.com/git-cache/blobdiff_plain/21971f7fbf702a0e9e35a3716b999ade7d02dd34..refs/heads/master:/git_cache.py diff --git a/git_cache.py b/git_cache.py index 1ed2601..76c0ed4 100644 --- a/git_cache.py +++ b/git_cache.py @@ -1,3 +1,10 @@ +# git-cache: Cache git content locally +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, version 3. + + # It would be nice if we could share the nix git cache, but as of the # time of writing it is transitioning from gitv2 (deprecated) to gitv3 # (not ready yet), and trying to straddle them both is too far into nix @@ -17,6 +24,11 @@ from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union import backoff + +class GitCacheError(Exception): + pass + + Path = str # eg: "/home/user/.cache/git-cache/v1" Repo = str # eg: "https://github.com/NixOS/nixpkgs.git" Ref = str # eg: "master" or "v1.0.0" @@ -67,7 +79,7 @@ def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool: 'merge-base', '--is-ancestor', ancestor, - descendant], + descendant.removeprefix('tag ')], check=False) return process.returncode == 0 @@ -78,14 +90,14 @@ def verify_ancestry( ancestor: RefOrRev, force: bool = False) -> None: if not force and not is_ancestor(repo, descendant, ancestor): - raise Exception('%s is not an ancestor of %s' % (ancestor, descendant)) + raise GitCacheError(f'{ancestor} is not an ancestor of {descendant}') def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]: filename = _log_filename(repo) if not os.path.exists(filename): return - with open(filename, 'r') as f: + with open(filename, 'r', encoding='utf-8') as f: for line in f: _, _, rev, ref = line.strip().split(maxsplit=3) yield _LogEntry(ref, rev) @@ -107,10 +119,11 @@ def _log_fetch(repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> None: verify_ancestry(repo, rev, prev_rev) filename = _log_filename(repo) os.makedirs(os.path.dirname(filename), exist_ok=True) - with open(filename, 'a') as f: - f.write('%s %s %s %s\n' % - (time.strftime('%Y-%m%d-%H:%M:%S%z'), - ('FORCEDFETCH' if force else 'fetch'), rev, ref)) + with open(filename, 'a', encoding='utf-8') as f: + f.write( + f'{time.strftime("%Y-%m%d-%H:%M:%S%z")} ' + f'{"FORCEDFETCH" if force else "fetch"} {rev} {ref}\n' + ) def _show_force_warning() -> None: @@ -150,7 +163,7 @@ def _show_force_warning() -> None: ''', end='', file=sys.stderr) for i in range(warn_time, 0, -1): - msg = '* %-70s *' % ("Continuing in %d seconds..." % i) + msg = f'* {f"Continuing in {i} seconds...":-70s} *' print(msg, file=sys.stderr) time.sleep(1) print('*' * 74, file=sys.stderr) @@ -165,9 +178,12 @@ def _git_fetch( repo: Repo, ref: Ref, force: bool = False) -> None: + refargs = (['tag', ref.removeprefix('tag ')] + if ref.startswith('tag ') + else [f'{ref}:{ref}']) subprocess.run(['git', '-C', cachedir, 'fetch'] + (['--force'] if force else []) + - [repo, '%s:%s' % (ref, ref)], check=True) + [repo] + refargs, check=True) def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]: @@ -188,7 +204,10 @@ def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]: logging.debug('Fetching ref "%s" from %s', ref, repo) _git_fetch(cachedir, repo, ref, force=force) - with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file: + rev_path = (['tags', ref.removeprefix('tag ')] + if ref.startswith('tag ') + else ['heads', ref]) + with open(os.path.join(cachedir, 'refs', *rev_path), encoding='utf-8') as rev_file: rev = Rev(rev_file.read(999).strip()) verify_ancestry(repo, ref, rev, force=force) _log_fetch(repo, ref, rev, force=force) @@ -244,7 +263,8 @@ def _main() -> None: args = parser.parse_args() if args.rev is None: - print('{1} {0}'.format(*fetch(args.repo, args.ref, force=args.force))) + cachedir, rev = fetch(args.repo, args.ref, force=args.force) + print(f'{rev} {cachedir}') else: print( ensure_rev_available(