+# git-cache: Cache git content locally
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, version 3.
+
+
# It would be nice if we could share the nix git cache, but as of the
# time of writing it is transitioning from gitv2 (deprecated) to gitv3
# (not ready yet), and trying to straddle them both is too far into nix
import backoff
+
+class GitCacheError(Exception):
+ pass
+
+
Path = str # eg: "/home/user/.cache/git-cache/v1"
Repo = str # eg: "https://github.com/NixOS/nixpkgs.git"
Ref = str # eg: "master" or "v1.0.0"
'merge-base',
'--is-ancestor',
ancestor,
- descendant],
+ descendant.removeprefix('tag ')],
check=False)
return process.returncode == 0
ancestor: RefOrRev,
force: bool = False) -> None:
if not force and not is_ancestor(repo, descendant, ancestor):
- raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
+ raise GitCacheError(f'{ancestor} is not an ancestor of {descendant}')
def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
filename = _log_filename(repo)
if not os.path.exists(filename):
return
- with open(filename, 'r') as f:
+ with open(filename, 'r', encoding='utf-8') as f:
for line in f:
_, _, rev, ref = line.strip().split(maxsplit=3)
yield _LogEntry(ref, rev)
verify_ancestry(repo, rev, prev_rev)
filename = _log_filename(repo)
os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, 'a') as f:
- f.write('%s %s %s %s\n' %
- (time.strftime('%Y-%m%d-%H:%M:%S%z'),
- ('FORCEDFETCH' if force else 'fetch'), rev, ref))
+ with open(filename, 'a', encoding='utf-8') as f:
+ f.write(
+ f'{time.strftime("%Y-%m%d-%H:%M:%S%z")} '
+ f'{"FORCEDFETCH" if force else "fetch"} {rev} {ref}\n'
+ )
def _show_force_warning() -> None:
''', end='', file=sys.stderr)
for i in range(warn_time, 0, -1):
- msg = '* %-70s *' % ("Continuing in %d seconds..." % i)
+ msg = f'* {f"Continuing in {i} seconds...":-70s} *'
print(msg, file=sys.stderr)
time.sleep(1)
print('*' * 74, file=sys.stderr)
repo: Repo,
ref: Ref,
force: bool = False) -> None:
+ refargs = (['tag', ref.removeprefix('tag ')]
+ if ref.startswith('tag ')
+ else [f'{ref}:{ref}'])
subprocess.run(['git', '-C', cachedir, 'fetch'] +
(['--force'] if force else []) +
- [repo, '%s:%s' % (ref, ref)], check=True)
+ [repo] + refargs, check=True)
def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
logging.debug('Fetching ref "%s" from %s', ref, repo)
_git_fetch(cachedir, repo, ref, force=force)
- with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file:
+ rev_path = (['tags', ref.removeprefix('tag ')]
+ if ref.startswith('tag ')
+ else ['heads', ref])
+ with open(os.path.join(cachedir, 'refs', *rev_path), encoding='utf-8') as rev_file:
rev = Rev(rev_file.read(999).strip())
verify_ancestry(repo, ref, rev, force=force)
_log_fetch(repo, ref, rev, force=force)
args = parser.parse_args()
if args.rev is None:
- print('{1} {0}'.format(*fetch(args.repo, args.ref, force=args.force)))
+ cachedir, rev = fetch(args.repo, args.ref, force=args.force)
+ print(f'{rev} {cachedir}')
else:
print(
ensure_rev_available(