# It would be nice if we could share the nix git cache, but as of the # time of writing it is transitioning from gitv2 (deprecated) to gitv3 # (not ready yet), and trying to straddle them both is too far into nix # implementation details for my comfort. So we re-implement here half of # nix's builtins.fetchGit. :( import argparse import functools import hashlib import logging import os import subprocess import sys import time from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union import backoff Path = str # eg: "/home/user/.cache/git-cache/v1" Repo = str # eg: "https://github.com/NixOS/nixpkgs.git" Ref = str # eg: "master" or "v1.0.0" Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d" RefOrRev = Union[Ref, Rev] class _LogEntry(NamedTuple): ref: Ref rev: Rev T = TypeVar('T') def _repo_hashname(repo: Repo) -> str: return hashlib.sha256(repo.encode()).hexdigest() def git_cachedir(repo: Repo) -> Path: # Use xdg module when it's less painful to have as a dependency XDG_CACHE_HOME = Path( os.environ.get('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))) return Path(os.path.join( XDG_CACHE_HOME, 'git-cache/v1', _repo_hashname(repo))) def _log_filename(repo: Repo) -> Path: # Use xdg module when it's less painful to have as a dependency XDG_DATA_HOME = Path( os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))) return Path(os.path.join( XDG_DATA_HOME, 'git-cache/v1', _repo_hashname(repo))) def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool: cachedir = git_cachedir(repo) logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant) process = subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor', ancestor, descendant], check=False) return process.returncode == 0 def verify_ancestry( repo: Repo, descendant: RefOrRev, ancestor: RefOrRev, force: bool = False) -> None: if not force and not is_ancestor(repo, descendant, ancestor): raise Exception('%s is not an ancestor of %s' % (ancestor, descendant)) def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]: filename = _log_filename(repo) if not os.path.exists(filename): return with open(filename, 'r') as f: for line in f: _, _, rev, ref = line.strip().split(maxsplit=3) yield _LogEntry(ref, rev) def _last(it: Iterator[T]) -> Optional[T]: return functools.reduce(lambda a, b: b, it, None) def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]: return _last(entry.rev for entry in _read_fetch_log( repo) if entry.ref == ref) def _log_fetch(repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> None: if not force: prev_rev = _previous_fetched_rev(repo, ref) if prev_rev is not None: verify_ancestry(repo, rev, prev_rev) filename = _log_filename(repo) os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'a') as f: f.write('%s %s %s %s\n' % (time.strftime('%Y-%m%d-%H:%M:%S%z'), ('FORCEDFETCH' if force else 'fetch'), rev, ref)) def _show_force_warning() -> None: print(''' ************************************************************************** * WARNING: git-cache INVOKED WITH --force! * * * * This mode allows previously-fetched refs to be overwritten to point to * * non-descendants -- commits that don't have the previous version of the * * the ref in their history! * * * * This should only be invoked by a human operator who knows what they're * * doing to correct a specific, known, problem. Care should be taken to * * prevent recurrence. * * * * Press ^C to abort. * * * ''', end='', file=sys.stderr) warn_time_override = os.environ.get('FORCE_WARNING_TIME', None) warn_time: int if warn_time_override is None: warn_time = 15 else: warn_time = int(warn_time_override) print( '''* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! * * !! WARNING DISPLAY TIME OVERRIDDEN !! * * !! !! * * !! This message is intended to be displayed long enough for a !! * * !! human operator to read it and have a chance to abort. An !! * * !! override for the delay time is provided FOR THE UNIT TESTS !! * * !! to avoid delaying software builds unnecessarily. This is !! * * !! INTENDED FOR USE IN UNIT TESTS ONLY; THIS MESSAGE SHOULD !! * * !! NEVER BE SEEN OUTSIDE BUILD LOGS! !! * * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! * * * ''', end='', file=sys.stderr) for i in range(warn_time, 0, -1): msg = '* %-70s *' % ("Continuing in %d seconds..." % i) print(msg, file=sys.stderr) time.sleep(1) print('*' * 74, file=sys.stderr) @backoff.on_exception( backoff.expo, subprocess.CalledProcessError, max_time=lambda: int(os.environ.get('BACKOFF_MAX_TIME', '30'))) def _git_fetch( cachedir: Path, repo: Repo, ref: Ref, force: bool = False) -> None: subprocess.run(['git', '-C', cachedir, 'fetch'] + (['--force'] if force else []) + [repo, '%s:%s' % (ref, ref)], check=True) def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]: if force: _show_force_warning() cachedir = git_cachedir(repo) if not os.path.exists(cachedir): logging.debug("Initializing git repo") subprocess.run(['git', '-c', 'init.defaultBranch=git-cache--no-default-branch', 'init', '--bare', cachedir], check=True, stdout=sys.stderr) logging.debug('Fetching ref "%s" from %s', ref, repo) _git_fetch(cachedir, repo, ref, force=force) with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file: rev = Rev(rev_file.read(999).strip()) verify_ancestry(repo, ref, rev, force=force) _log_fetch(repo, ref, rev, force=force) return cachedir, rev def ensure_rev_available( repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> Path: cachedir = git_cachedir(repo) if os.path.exists(cachedir) and is_ancestor(repo, ref, rev): return cachedir logging.debug( 'We do not have rev %s. We will fetch ref "%s" and hope it appears.', rev, ref) fetch(repo, ref, force=force) logging.debug('Verifying that fetch retrieved rev %s', rev) subprocess.run(['git', '-C', cachedir, 'cat-file', '-e', rev], check=True) verify_ancestry(repo, ref, rev, force=force) return cachedir def _main() -> None: parser = argparse.ArgumentParser( description='Cache remote git repositories locally.', epilog='example usage: git-cache https://github.com/NixOS/nixpkgs.git master') parser.add_argument( '--force', action='store_true', help='Recover from a force-push in the remote repo') parser.add_argument( 'repo', metavar='Repo', type=Repo, help='Git repository URL') parser.add_argument( 'ref', metavar='Ref', type=Ref, help='Ref (branch or tag) in the git repo') parser.add_argument( 'rev', metavar='Rev', type=Rev, nargs='?', help='Ensure that this revision is present. ' + 'If this revision is already present locally, no network operations are performed.') args = parser.parse_args() if args.rev is None: print('{1} {0}'.format(*fetch(args.repo, args.ref, force=args.force))) else: print( ensure_rev_available( args.repo, args.ref, args.rev, force=args.force))