# It would be nice if we could share the nix git cache, but as of the # time of writing it is transitioning from gitv2 (deprecated) to gitv3 # (not ready yet), and trying to straddle them both is too far into nix # implementation details for my comfort. So we re-implement here half of # nix's builtins.fetchGit. :( import argparse import functools import hashlib import logging import os import subprocess import sys import time from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union import backoff Path = str # eg: "/home/user/.cache/git-cache/v1" Repo = str # eg: "https://github.com/NixOS/nixpkgs.git" Ref = str # eg: "master" or "v1.0.0" Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d" RefOrRev = Union[Ref, Rev] class _LogEntry(NamedTuple): ref: Ref rev: Rev T = TypeVar('T') def _repo_hashname(repo: Repo) -> str: return hashlib.sha256(repo.encode()).hexdigest() def git_cachedir(repo: Repo) -> Path: # Use xdg module when it's less painful to have as a dependency XDG_CACHE_HOME = Path( os.environ.get('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))) return Path(os.path.join( XDG_CACHE_HOME, 'git-cache/v1', _repo_hashname(repo))) def _log_filename(repo: Repo) -> Path: # Use xdg module when it's less painful to have as a dependency XDG_DATA_HOME = Path( os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))) return Path(os.path.join( XDG_DATA_HOME, 'git-cache/v1', _repo_hashname(repo))) def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool: cachedir = git_cachedir(repo) logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant) process = subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor', ancestor, descendant], check=False) return process.returncode == 0 def verify_ancestry( repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> None: if not is_ancestor(repo, descendant, ancestor): raise Exception('%s is not an ancestor of %s' % (ancestor, descendant)) def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]: filename = _log_filename(repo) if not os.path.exists(filename): return with open(filename, 'r') as f: for line in f: _, _, rev, ref = line.strip().split(maxsplit=3) yield _LogEntry(ref, rev) def _last(it: Iterator[T]) -> Optional[T]: return functools.reduce(lambda a, b: b, it, None) def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]: return _last(entry.rev for entry in _read_fetch_log( repo) if entry.ref == ref) def _log_fetch(repo: Repo, ref: Ref, rev: Rev) -> None: prev_rev = _previous_fetched_rev(repo, ref) if prev_rev is not None: verify_ancestry(repo, rev, prev_rev) filename = _log_filename(repo) os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'a') as f: f.write('%s fetch %s %s\n' % (time.strftime('%Y-%m%d-%H:%M:%S%z'), rev, ref)) @backoff.on_exception( backoff.expo, subprocess.CalledProcessError, max_time=lambda: int(os.environ.get('BACKOFF_MAX_TIME', '30'))) def _git_fetch(cachedir: Path, repo: Repo, ref: Ref) -> None: # We don't use --force here because we want to abort and freak out if forced # updates are happening. subprocess.run(['git', '-C', cachedir, 'fetch', repo, '%s:%s' % (ref, ref)], check=True) def fetch(repo: Repo, ref: Ref) -> Tuple[Path, Rev]: cachedir = git_cachedir(repo) if not os.path.exists(cachedir): logging.debug("Initializing git repo") subprocess.run(['git', 'init', '--bare', cachedir], check=True, stdout=sys.stderr) logging.debug('Fetching ref "%s" from %s', ref, repo) _git_fetch(cachedir, repo, ref) with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file: rev = Rev(rev_file.read(999).strip()) verify_ancestry(repo, ref, rev) _log_fetch(repo, ref, rev) return cachedir, rev def ensure_rev_available(repo: Repo, ref: Ref, rev: Rev) -> Path: cachedir = git_cachedir(repo) if os.path.exists(cachedir) and is_ancestor(repo, ref, rev): return cachedir logging.debug( 'We do not have rev %s. We will fetch ref "%s" and hope it appears.', rev, ref) fetch(repo, ref) logging.debug('Verifying that fetch retrieved rev %s', rev) subprocess.run(['git', '-C', cachedir, 'cat-file', '-e', rev], check=True) verify_ancestry(repo, ref, rev) return cachedir def _main() -> None: parser = argparse.ArgumentParser( description='Cache remote git repositories locally.', epilog='example usage: git-cache https://github.com/NixOS/nixpkgs.git master') parser.add_argument( 'repo', metavar='Repo', type=Repo, help='Git repository URL') parser.add_argument( 'ref', metavar='Ref', type=Ref, help='Ref (branch or tag) in the git repo') parser.add_argument( 'rev', metavar='Rev', type=Rev, nargs='?', help='Ensure that this revision is present. ' + 'If this revision is already present locally, no network operations are performed.') args = parser.parse_args() if args.rev is None: print('{1} {0}'.format(*fetch(args.repo, args.ref))) else: print(ensure_rev_available(args.repo, args.ref, args.rev))