X-Git-Url: http://git.scottworley.com/git-cache/blobdiff_plain/eb638847b92912d25b7de4a98418bb96f0d43eec..d1ab08536175fe3eb4bd655446f6e3f9eb4b97c0:/git_cache.py diff --git a/git_cache.py b/git_cache.py index 4843003..d43df26 100644 --- a/git_cache.py +++ b/git_cache.py @@ -4,20 +4,36 @@ # implementation details for my comfort. So we re-implement here half of # nix's builtins.fetchGit. :( +import argparse +import functools import hashlib import logging import os import subprocess import sys +import time -from typing import Tuple +from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union import backoff -Path = str # eg: "/home/user/.cache/git-cache/v1" -Repo = str # eg: "https://github.com/NixOS/nixpkgs.git" -Ref = str # eg: "master" or "v1.0.0" -Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d" +Path = str # eg: "/home/user/.cache/git-cache/v1" +Repo = str # eg: "https://github.com/NixOS/nixpkgs.git" +Ref = str # eg: "master" or "v1.0.0" +Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d" +RefOrRev = Union[Ref, Rev] + + +class _LogEntry(NamedTuple): + ref: Ref + rev: Rev + + +T = TypeVar('T') + + +def _repo_hashname(repo: Repo) -> str: + return hashlib.sha256(repo.encode()).hexdigest() def git_cachedir(repo: Repo) -> Path: @@ -28,20 +44,70 @@ def git_cachedir(repo: Repo) -> Path: return Path(os.path.join( XDG_CACHE_HOME, 'git-cache/v1', - hashlib.sha256(repo.encode()).hexdigest())) + _repo_hashname(repo))) + + +def _log_filename(repo: Repo) -> Path: + # Use xdg module when it's less painful to have as a dependency + XDG_DATA_HOME = Path( + os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))) + + return Path(os.path.join( + XDG_DATA_HOME, + 'git-cache/v1', + _repo_hashname(repo))) -def is_ancestor(repo: Repo, ref: Ref, rev: Rev) -> bool: +def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool: cachedir = git_cachedir(repo) - logging.debug('Checking if rev %s is an ancestor of ref "%s"', rev, ref) - process = subprocess.run( - ['git', '-C', cachedir, 'merge-base', '--is-ancestor', rev, ref], check=False) + logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant) + process = subprocess.run(['git', + '-C', + cachedir, + 'merge-base', + '--is-ancestor', + ancestor, + descendant], + check=False) return process.returncode == 0 -def verify_ancestry(repo: Repo, ref: Ref, rev: Rev) -> None: - if not is_ancestor(repo, ref, rev): - raise Exception('Rev %s is not an ancestor of ref "%s"' % (rev, ref)) +def verify_ancestry( + repo: Repo, + descendant: RefOrRev, + ancestor: RefOrRev) -> None: + if not is_ancestor(repo, descendant, ancestor): + raise Exception('%s is not an ancestor of %s' % (ancestor, descendant)) + + +def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]: + filename = _log_filename(repo) + if not os.path.exists(filename): + return + with open(filename, 'r') as f: + for line in f: + _, _, rev, ref = line.strip().split(maxsplit=3) + yield _LogEntry(ref, rev) + + +def _last(it: Iterator[T]) -> Optional[T]: + return functools.reduce(lambda a, b: b, it, None) + + +def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]: + return _last(entry.rev for entry in _read_fetch_log( + repo) if entry.ref == ref) + + +def _log_fetch(repo: Repo, ref: Ref, rev: Rev) -> None: + prev_rev = _previous_fetched_rev(repo, ref) + if prev_rev is not None: + verify_ancestry(repo, rev, prev_rev) + filename = _log_filename(repo) + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'a') as f: + f.write('%s fetch %s %s\n' % + (time.strftime('%Y-%m%d-%H:%M:%S%z'), rev, ref)) @backoff.on_exception( @@ -68,6 +134,7 @@ def fetch(repo: Repo, ref: Ref) -> Tuple[Path, Rev]: with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file: rev = Rev(rev_file.read(999).strip()) verify_ancestry(repo, ref, rev) + _log_fetch(repo, ref, rev) return cachedir, rev @@ -89,13 +156,29 @@ def ensure_rev_available(repo: Repo, ref: Ref, rev: Rev) -> Path: def _main() -> None: - if len(sys.argv) == 3: - print('{1} {0}'.format(*fetch(Repo(sys.argv[1]), Ref(sys.argv[2])))) - elif len(sys.argv) == 4: - print(ensure_rev_available( - Repo(sys.argv[1]), Ref(sys.argv[2]), Rev(sys.argv[3]))) + parser = argparse.ArgumentParser( + description='Cache remote git repositories locally.', + epilog='example usage: git-cache https://github.com/NixOS/nixpkgs.git master') + parser.add_argument( + 'repo', + metavar='Repo', + type=Repo, + help='Git repository URL') + parser.add_argument( + 'ref', + metavar='Ref', + type=Ref, + help='Ref (branch or tag) in the git repo') + parser.add_argument( + 'rev', + metavar='Rev', + type=Rev, + nargs='?', + help='Ensure that this revision is present. ' + + 'If this revision is already present locally, no network operations are performed.') + args = parser.parse_args() + + if args.rev is None: + print('{1} {0}'.format(*fetch(args.repo, args.ref))) else: - usage = '''usage: git-cache repo ref [rev] -example: git-cache https://github.com/NixOS/nixpkgs.git master''' - print(usage, file=sys.stderr) - sys.exit(1) + print(ensure_rev_available(args.repo, args.ref, args.rev))