+ logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant)
+ process = subprocess.run(['git',
+ '-C',
+ cachedir,
+ 'merge-base',
+ '--is-ancestor',
+ ancestor,
+ descendant],
+ check=False)
+ return process.returncode == 0
+
+
+def verify_ancestry(
+ repo: Repo,
+ descendant: RefOrRev,
+ ancestor: RefOrRev) -> None:
+ if not is_ancestor(repo, descendant, ancestor):
+ raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
+
+
+def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
+ filename = _log_filename(repo)
+ if not os.path.exists(filename):
+ return
+ with open(filename, 'r') as f:
+ for line in f:
+ _, _, rev, ref = line.strip().split(maxsplit=3)
+ yield _LogEntry(ref, rev)
+
+
+def _last(it: Iterator[T]) -> Optional[T]:
+ return functools.reduce(lambda a, b: b, it, None)
+
+
+def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]:
+ return _last(entry.rev for entry in _read_fetch_log(
+ repo) if entry.ref == ref)
+
+
+def _log_fetch(repo: Repo, ref: Ref, rev: Rev) -> None:
+ prev_rev = _previous_fetched_rev(repo, ref)
+ if prev_rev is not None:
+ verify_ancestry(repo, rev, prev_rev)
+ filename = _log_filename(repo)
+ os.makedirs(os.path.dirname(filename), exist_ok=True)
+ with open(filename, 'a') as f:
+ f.write('%s fetch %s %s\n' %
+ (time.strftime('%Y-%m%d-%H:%M:%S%z'), rev, ref))