ancestor: RefOrRev,
force: bool = False) -> None:
if not force and not is_ancestor(repo, descendant, ancestor):
- raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
+ raise Exception(f'{ancestor} is not an ancestor of {descendant}')
def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
filename = _log_filename(repo)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'a') as f:
- f.write('%s %s %s %s\n' %
- (time.strftime('%Y-%m%d-%H:%M:%S%z'),
- ('FORCEDFETCH' if force else 'fetch'), rev, ref))
+ f.write(
+ f'{time.strftime("%Y-%m%d-%H:%M:%S%z")} '
+ f'{"FORCEDFETCH" if force else "fetch"} {rev} {ref}\n'
+ )
def _show_force_warning() -> None:
''', end='', file=sys.stderr)
for i in range(warn_time, 0, -1):
- msg = '* %-70s *' % ("Continuing in %d seconds..." % i)
+ msg = f'* {f"Continuing in {i} seconds...":-70s} *'
print(msg, file=sys.stderr)
time.sleep(1)
print('*' * 74, file=sys.stderr)
force: bool = False) -> None:
subprocess.run(['git', '-C', cachedir, 'fetch'] +
(['--force'] if force else []) +
- [repo, '%s:%s' % (ref, ref)], check=True)
+ [repo, f'{ref}:{ref}'], check=True)
def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
args = parser.parse_args()
if args.rev is None:
- print('{1} {0}'.format(*fetch(args.repo, args.ref, force=args.force)))
+ cachedir, rev = fetch(args.repo, args.ref, force=args.force)
+ print(f'{rev} {cachedir}')
else:
print(
ensure_rev_available(
def test_fetch(self) -> None:
d, rev = git_cache.fetch(self.upstream, 'main')
- self.assertEqual(_git(d, 'show', '%s:file' % rev), b'Contents')
+ self.assertEqual(_git(d, 'show', f'{rev}:file'), b'Contents')
def test_fetch_twice(self) -> None:
d1, rev1 = git_cache.fetch(self.upstream, 'main')
- self.assertEqual(_git(d1, 'show', '%s:file' % rev1), b'Contents')
+ self.assertEqual(_git(d1, 'show', f'{rev1}:file'), b'Contents')
d2, rev2 = git_cache.fetch(self.upstream, 'main')
self.assertEqual(d1, d2)
self.assertEqual(rev1, rev2)
- self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+ self.assertEqual(_git(d2, 'show', f'{rev2}:file'), b'Contents')
def test_fetch_then_ensure(self) -> None:
d1, rev = git_cache.fetch(self.upstream, 'main')
- self.assertEqual(_git(d1, 'show', '%s:file' % rev), b'Contents')
+ self.assertEqual(_git(d1, 'show', f'{rev}:file'), b'Contents')
d2 = git_cache.ensure_rev_available(self.upstream, 'main', rev)
self.assertEqual(d1, d2)
- self.assertEqual(_git(d2, 'show', '%s:file' % rev), b'Contents')
+ self.assertEqual(_git(d2, 'show', f'{rev}:file'), b'Contents')
def test_ensure_then_fetch(self) -> None:
rev1 = _git(
self.upstream, 'log', '--format=%H', '-n1').strip().decode()
d1 = git_cache.ensure_rev_available(self.upstream, 'main', rev1)
- self.assertEqual(_git(d1, 'show', '%s:file' % rev1), b'Contents')
+ self.assertEqual(_git(d1, 'show', f'{rev1}:file'), b'Contents')
d2, rev2 = git_cache.fetch(self.upstream, 'main')
self.assertEqual(d1, d2)
self.assertEqual(rev1, rev2)
- self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+ self.assertEqual(_git(d2, 'show', f'{rev2}:file'), b'Contents')
def test_fetch_new_file(self) -> None:
d1, rev1 = git_cache.fetch(self.upstream, 'main')
d2, rev2 = git_cache.fetch(self.upstream, 'main')
self.assertEqual(d1, d2)
self.assertNotEqual(rev1, rev2)
- self.assertEqual(_git(d2, 'show', '%s:foofile' % rev2), b'foo')
+ self.assertEqual(_git(d2, 'show', f'{rev2}:foofile'), b'foo')
def test_ensure_doesnt_fetch_new_file(self) -> None:
d1, rev1 = git_cache.fetch(self.upstream, 'main')
d2 = git_cache.ensure_rev_available(self.upstream, 'main', rev1)
self.assertEqual(d1, d2)
p = subprocess.run(
- ['git', '-C', d2, 'show', '%s:foofile' % rev2], check=False)
+ ['git', '-C', d2, 'show', f'{rev2}:foofile'], check=False)
self.assertNotEqual(p.returncode, 0)
def test_ensure_doesnt_fetch_from_deleted_upstream(self) -> None:
self.assertNotEqual(rev1, rev2)
d2 = git_cache.ensure_rev_available(self.upstream, 'main', rev2)
self.assertEqual(d1, d2)
- self.assertEqual(_git(d2, 'show', '%s:foofile' % rev2), b'foo')
+ self.assertEqual(_git(d2, 'show', f'{rev2}:foofile'), b'foo')
def test_fetch_raises_on_invalid_repo(self) -> None:
self.tempdir.cleanup()
_commit_file(self.upstream, 'foofile', 'foo', 'Foo')
rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
d = git_cache.ensure_rev_available(self.upstream, 'otherbranch', rev)
- self.assertEqual(_git(d, 'show', '%s:foofile' % rev), b'foo')
+ self.assertEqual(_git(d, 'show', f'{rev}:foofile'), b'foo')
def test_catch_up(self) -> None:
_git(self.upstream, 'checkout', '-b', 'otherbranch')
_commit_file(self.upstream, 'foofile', 'foo', 'Foo')
rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
d = git_cache.ensure_rev_available(self.upstream, 'otherbranch', rev)
- self.assertEqual(_git(d, 'show', '%s:foofile' % rev), b'foo')
+ self.assertEqual(_git(d, 'show', f'{rev}:foofile'), b'foo')
_git(self.upstream, 'checkout', 'main')
_git(self.upstream, 'merge', '--ff-only', 'otherbranch')
d = git_cache.ensure_rev_available(self.upstream, 'main', rev)
- self.assertEqual(_git(d, 'show', '%s:foofile' % rev), b'foo')
+ self.assertEqual(_git(d, 'show', f'{rev}:foofile'), b'foo')
def test_fetch_after_cache_deleted(self) -> None:
d1, rev1 = git_cache.fetch(self.upstream, 'main')
d2, rev2 = git_cache.fetch(self.upstream, 'main')
self.assertEqual(d1, d2)
self.assertEqual(rev1, rev2)
- self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+ self.assertEqual(_git(d2, 'show', f'{rev2}:file'), b'Contents')
def test_ensure_after_cache_deleted(self) -> None:
d1, rev = git_cache.fetch(self.upstream, 'main')
shutil.rmtree(d1)
d2 = git_cache.ensure_rev_available(self.upstream, 'main', rev)
self.assertEqual(d1, d2)
- self.assertEqual(_git(d2, 'show', '%s:file' % rev), b'Contents')
+ self.assertEqual(_git(d2, 'show', f'{rev}:file'), b'Contents')
def test_fetch_raises_on_amend(self) -> None:
git_cache.fetch(self.upstream, 'main')