+# git-cache: Cache git content locally
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, version 3.
+
+
# It would be nice if we could share the nix git cache, but as of the
# time of writing it is transitioning from gitv2 (deprecated) to gitv3
# (not ready yet), and trying to straddle them both is too far into nix
# implementation details for my comfort. So we re-implement here half of
# nix's builtins.fetchGit. :(
+import argparse
+import functools
import hashlib
import logging
import os
import subprocess
import sys
+import time
+
+from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union
+
+import backoff
+
+
+class GitCacheError(Exception):
+ pass
+
+
+Path = str # eg: "/home/user/.cache/git-cache/v1"
+Repo = str # eg: "https://github.com/NixOS/nixpkgs.git"
+Ref = str # eg: "master" or "v1.0.0"
+Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d"
+RefOrRev = Union[Ref, Rev]
+
-from typing import Tuple
+class _LogEntry(NamedTuple):
+ ref: Ref
+ rev: Rev
-Path = str # eg: "/home/user/.cache/git-cache/v1"
-Repo = str # eg: "https://github.com/NixOS/nixpkgs.git"
-Ref = str # eg: "master" or "v1.0.0"
-Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d"
+
+T = TypeVar('T')
+
+
+def _repo_hashname(repo: Repo) -> str:
+ return hashlib.sha256(repo.encode()).hexdigest()
def git_cachedir(repo: Repo) -> Path:
return Path(os.path.join(
XDG_CACHE_HOME,
'git-cache/v1',
- hashlib.sha256(repo.encode()).hexdigest()))
+ _repo_hashname(repo)))
+
+
+def _log_filename(repo: Repo) -> Path:
+ # Use xdg module when it's less painful to have as a dependency
+ XDG_DATA_HOME = Path(
+ os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')))
+
+ return Path(os.path.join(
+ XDG_DATA_HOME,
+ 'git-cache/v1',
+ _repo_hashname(repo)))
-def verify_ancestry(repo: Repo, ref: Ref, rev: Rev) -> None:
+def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool:
cachedir = git_cachedir(repo)
- logging.debug('Verifying rev %s is an ancestor of ref "%s"', rev, ref)
- subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor',
- rev, ref], check=True)
+ logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant)
+ process = subprocess.run(['git',
+ '-C',
+ cachedir,
+ 'merge-base',
+ '--is-ancestor',
+ ancestor,
+ descendant.removeprefix('tag ')],
+ check=False)
+ return process.returncode == 0
-def fetch(repo: Repo, ref: Ref) -> Tuple[Path, Rev]:
+def verify_ancestry(
+ repo: Repo,
+ descendant: RefOrRev,
+ ancestor: RefOrRev,
+ force: bool = False) -> None:
+ if not force and not is_ancestor(repo, descendant, ancestor):
+ raise GitCacheError(f'{ancestor} is not an ancestor of {descendant}')
+
+
+def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
+ filename = _log_filename(repo)
+ if not os.path.exists(filename):
+ return
+ with open(filename, 'r', encoding='utf-8') as f:
+ for line in f:
+ _, _, rev, ref = line.strip().split(maxsplit=3)
+ yield _LogEntry(ref, rev)
+
+
+def _last(it: Iterator[T]) -> Optional[T]:
+ return functools.reduce(lambda a, b: b, it, None)
+
+
+def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]:
+ return _last(entry.rev for entry in _read_fetch_log(
+ repo) if entry.ref == ref)
+
+
+def _log_fetch(repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> None:
+ if not force:
+ prev_rev = _previous_fetched_rev(repo, ref)
+ if prev_rev is not None:
+ verify_ancestry(repo, rev, prev_rev)
+ filename = _log_filename(repo)
+ os.makedirs(os.path.dirname(filename), exist_ok=True)
+ with open(filename, 'a', encoding='utf-8') as f:
+ f.write(
+ f'{time.strftime("%Y-%m%d-%H:%M:%S%z")} '
+ f'{"FORCEDFETCH" if force else "fetch"} {rev} {ref}\n'
+ )
+
+
+def _show_force_warning() -> None:
+ print('''
+**************************************************************************
+* WARNING: git-cache INVOKED WITH --force! *
+* *
+* This mode allows previously-fetched refs to be overwritten to point to *
+* non-descendants -- commits that don't have the previous version of the *
+* the ref in their history! *
+* *
+* This should only be invoked by a human operator who knows what they're *
+* doing to correct a specific, known, problem. Care should be taken to *
+* prevent recurrence. *
+* *
+* Press ^C to abort. *
+* *
+''', end='', file=sys.stderr)
+ warn_time_override = os.environ.get('FORCE_WARNING_TIME', None)
+ warn_time: int
+ if warn_time_override is None:
+ warn_time = 15
+ else:
+ warn_time = int(warn_time_override)
+ print(
+ '''* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
+* !! WARNING DISPLAY TIME OVERRIDDEN !! *
+* !! !! *
+* !! This message is intended to be displayed long enough for a !! *
+* !! human operator to read it and have a chance to abort. An !! *
+* !! override for the delay time is provided FOR THE UNIT TESTS !! *
+* !! to avoid delaying software builds unnecessarily. This is !! *
+* !! INTENDED FOR USE IN UNIT TESTS ONLY; THIS MESSAGE SHOULD !! *
+* !! NEVER BE SEEN OUTSIDE BUILD LOGS! !! *
+* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
+* *
+''', end='', file=sys.stderr)
+
+ for i in range(warn_time, 0, -1):
+ msg = f'* {f"Continuing in {i} seconds...":-70s} *'
+ print(msg, file=sys.stderr)
+ time.sleep(1)
+ print('*' * 74, file=sys.stderr)
+
+
+@backoff.on_exception(
+ backoff.expo,
+ subprocess.CalledProcessError,
+ max_time=lambda: int(os.environ.get('BACKOFF_MAX_TIME', '30')))
+def _git_fetch(
+ cachedir: Path,
+ repo: Repo,
+ ref: Ref,
+ force: bool = False) -> None:
+ refargs = (['tag', ref.removeprefix('tag ')]
+ if ref.startswith('tag ')
+ else [f'{ref}:{ref}'])
+ subprocess.run(['git', '-C', cachedir, 'fetch'] +
+ (['--force'] if force else []) +
+ [repo] + refargs, check=True)
+
+
+def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
+ if force:
+ _show_force_warning()
cachedir = git_cachedir(repo)
if not os.path.exists(cachedir):
logging.debug("Initializing git repo")
- subprocess.run(['git', 'init', '--bare', cachedir], check=True)
+ subprocess.run(['git',
+ '-c',
+ 'init.defaultBranch=git-cache--no-default-branch',
+ 'init',
+ '--bare',
+ cachedir],
+ check=True,
+ stdout=sys.stderr)
logging.debug('Fetching ref "%s" from %s', ref, repo)
- # We don't use --force here because we want to abort and freak out if forced
- # updates are happening.
- subprocess.run(['git', '-C', cachedir, 'fetch', repo,
- '%s:%s' % (ref, ref)], check=True)
+ _git_fetch(cachedir, repo, ref, force=force)
- with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file:
+ rev_path = (['tags', ref.removeprefix('tag ')]
+ if ref.startswith('tag ')
+ else ['heads', ref])
+ with open(os.path.join(cachedir, 'refs', *rev_path), encoding='utf-8') as rev_file:
rev = Rev(rev_file.read(999).strip())
- verify_ancestry(repo, ref, rev)
+ verify_ancestry(repo, ref, rev, force=force)
+ _log_fetch(repo, ref, rev, force=force)
return cachedir, rev
-def ensure_rev_available(repo: Repo, ref: Ref, rev: Rev) -> Path:
+def ensure_rev_available(
+ repo: Repo,
+ ref: Ref,
+ rev: Rev,
+ force: bool = False) -> Path:
cachedir = git_cachedir(repo)
- if os.path.exists(cachedir):
- logging.debug('Checking if we already have rev %s', rev)
- process = subprocess.run(
- ['git', '-C', cachedir, 'cat-file', '-e', rev], check=False)
- if process.returncode == 0:
- logging.debug('We already have rev %s', rev)
- verify_ancestry(repo, ref, rev)
- return cachedir
- if process.returncode != 1:
- raise Exception(
- 'Could not test for presence of rev %s. Is cache dir "%s" messed up?' %
- (rev, cachedir))
+ if os.path.exists(cachedir) and is_ancestor(repo, ref, rev):
+ return cachedir
logging.debug(
'We do not have rev %s. We will fetch ref "%s" and hope it appears.',
rev, ref)
- fetch(repo, ref)
+ fetch(repo, ref, force=force)
logging.debug('Verifying that fetch retrieved rev %s', rev)
subprocess.run(['git', '-C', cachedir, 'cat-file', '-e', rev], check=True)
+ verify_ancestry(repo, ref, rev, force=force)
return cachedir
def _main() -> None:
- if len(sys.argv) == 3:
- print('{1} {0}'.format(*fetch(Repo(sys.argv[1]), Ref(sys.argv[2]))))
- elif len(sys.argv) == 4:
- print(ensure_rev_available(
- Repo(sys.argv[1]), Ref(sys.argv[2]), Rev(sys.argv[3])))
+ parser = argparse.ArgumentParser(
+ description='Cache remote git repositories locally.',
+ epilog='example usage: git-cache https://github.com/NixOS/nixpkgs.git master')
+ parser.add_argument(
+ '--force',
+ action='store_true',
+ help='Recover from a force-push in the remote repo')
+ parser.add_argument(
+ 'repo',
+ metavar='Repo',
+ type=Repo,
+ help='Git repository URL')
+ parser.add_argument(
+ 'ref',
+ metavar='Ref',
+ type=Ref,
+ help='Ref (branch or tag) in the git repo')
+ parser.add_argument(
+ 'rev',
+ metavar='Rev',
+ type=Rev,
+ nargs='?',
+ help='Ensure that this revision is present. ' +
+ 'If this revision is already present locally, no network operations are performed.')
+ args = parser.parse_args()
+
+ if args.rev is None:
+ cachedir, rev = fetch(args.repo, args.ref, force=args.force)
+ print(f'{rev} {cachedir}')
else:
- usage = '''usage: git-cache repo ref [rev]
-example: git-cache https://github.com/NixOS/nixpkgs.git master'''
- print(usage, file=sys.stderr)
- sys.exit(1)
+ print(
+ ensure_rev_available(
+ args.repo,
+ args.ref,
+ args.rev,
+ force=args.force))