+
+def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool:
+ cachedir = git_cachedir(repo)
+ logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant)
+ process = subprocess.run(['git',
+ '-C',
+ cachedir,
+ 'merge-base',
+ '--is-ancestor',
+ ancestor,
+ descendant],
+ check=False)
+ return process.returncode == 0
+
+
+def verify_ancestry(
+ repo: Repo,
+ descendant: RefOrRev,
+ ancestor: RefOrRev,
+ force: bool = False) -> None:
+ if not force and not is_ancestor(repo, descendant, ancestor):
+ raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
+
+
+def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
+ filename = _log_filename(repo)
+ if not os.path.exists(filename):
+ return
+ with open(filename, 'r') as f:
+ for line in f:
+ _, _, rev, ref = line.strip().split(maxsplit=3)
+ yield _LogEntry(ref, rev)
+
+
+def _last(it: Iterator[T]) -> Optional[T]:
+ return functools.reduce(lambda a, b: b, it, None)
+
+
+def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]:
+ return _last(entry.rev for entry in _read_fetch_log(
+ repo) if entry.ref == ref)
+
+
+def _log_fetch(repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> None:
+ if not force:
+ prev_rev = _previous_fetched_rev(repo, ref)
+ if prev_rev is not None:
+ verify_ancestry(repo, rev, prev_rev)
+ filename = _log_filename(repo)
+ os.makedirs(os.path.dirname(filename), exist_ok=True)
+ with open(filename, 'a') as f:
+ f.write('%s %s %s %s\n' %
+ (time.strftime('%Y-%m%d-%H:%M:%S%z'),
+ ('FORCEDFETCH' if force else 'fetch'), rev, ref))
+
+
+def _show_force_warning() -> None:
+ print('''
+**************************************************************************
+* WARNING: git-cache INVOKED WITH --force! *
+* *
+* This mode allows previously-fetched refs to be overwritten to point to *
+* non-descendants -- commits that don't have the previous version of the *
+* the ref in their history! *
+* *
+* This should only be invoked by a human operator who knows what they're *
+* doing to correct a specific, known, problem. Care should be taken to *
+* prevent recurrence. *
+* *
+* Press ^C to abort. *
+* *
+''', end='', file=sys.stderr)
+ warn_time_override = os.environ.get('FORCE_WARNING_TIME', None)
+ warn_time: int
+ if warn_time_override is None:
+ warn_time = 15
+ else:
+ warn_time = int(warn_time_override)
+ print(
+ '''* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
+* !! WARNING DISPLAY TIME OVERRIDDEN !! *
+* !! !! *
+* !! This message is intended to be displayed long enough for a !! *
+* !! human operator to read it and have a chance to abort. An !! *
+* !! override for the delay time is provided FOR THE UNIT TESTS !! *
+* !! to avoid delaying software builds unnecessarily. This is !! *
+* !! INTENDED FOR USE IN UNIT TESTS ONLY; THIS MESSAGE SHOULD !! *
+* !! NEVER BE SEEN OUTSIDE BUILD LOGS! !! *
+* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
+* *
+''', end='', file=sys.stderr)
+
+ for i in range(warn_time, 0, -1):
+ msg = '* %-70s *' % ("Continuing in %d seconds..." % i)
+ print(msg, file=sys.stderr)
+ time.sleep(1)
+ print('*' * 74, file=sys.stderr)
+
+
+@backoff.on_exception(
+ backoff.expo,
+ subprocess.CalledProcessError,
+ max_time=lambda: int(os.environ.get('BACKOFF_MAX_TIME', '30')))
+def _git_fetch(
+ cachedir: Path,
+ repo: Repo,
+ ref: Ref,
+ force: bool = False) -> None:
+ subprocess.run(['git', '-C', cachedir, 'fetch'] +
+ (['--force'] if force else []) +
+ [repo, '%s:%s' % (ref, ref)], check=True)
+
+
+def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
+ if force:
+ _show_force_warning()