--- /dev/null
+# It would be nice if we could share the nix git cache, but as of the
+# time of writing it is transitioning from gitv2 (deprecated) to gitv3
+# (not ready yet), and trying to straddle them both is too far into nix
+# implementation details for my comfort. So we re-implement here half of
+# nix's builtins.fetchGit. :(
+
+import hashlib
+import logging
+import os
+import subprocess
+
+from typing import Tuple
+
+Path = str # eg: "/home/user/.cache/git-cache/v1"
+Repo = str # eg: "https://github.com/NixOS/nixpkgs.git"
+Ref = str # eg: "master" or "v1.0.0"
+Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d"
+
+
+def git_cachedir(repo: Repo) -> Path:
+ # Use xdg module when it's less painful to have as a dependency
+ XDG_CACHE_HOME = Path(
+ os.environ.get('XDG_CACHE_HOME', os.path.expanduser('~/.cache')))
+
+ return Path(os.path.join(
+ XDG_CACHE_HOME,
+ 'git-cache/v1',
+ hashlib.sha256(repo.encode()).hexdigest()))
+
+
+def verify_ancestry(repo: Repo, ref: Ref, rev: Rev) -> None:
+ cachedir = git_cachedir(repo)
+ logging.debug('Verifying rev %s is an ancestor of ref "%s"', rev, ref)
+ subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor',
+ rev, ref], check=True)
+
+
+def fetch(repo: Repo, ref: Ref) -> Tuple[Path, Rev]:
+ cachedir = git_cachedir(repo)
+ if not os.path.exists(cachedir):
+ logging.debug("Initializing git repo")
+ subprocess.run(['git', 'init', '--bare', cachedir], check=True)
+
+ logging.debug('Fetching ref "%s" from %s', ref, repo)
+ # We don't use --force here because we want to abort and freak out if forced
+ # updates are happening.
+ subprocess.run(['git', '-C', cachedir, 'fetch', repo,
+ '%s:%s' % (ref, ref)], check=True)
+
+ with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file:
+ rev = Rev(rev_file.read(999).strip())
+ verify_ancestry(repo, ref, rev)
+
+ return cachedir, rev
+
+
+def ensure_rev_available(repo: Repo, ref: Ref, rev: Rev) -> Path:
+ cachedir = git_cachedir(repo)
+ if os.path.exists(cachedir):
+ logging.debug('Checking if we already have rev %s', rev)
+ process = subprocess.run(
+ ['git', '-C', cachedir, 'cat-file', '-e', rev], check=False)
+ if process.returncode == 0:
+ logging.debug('We already have rev %s', rev)
+ verify_ancestry(repo, ref, rev)
+ return cachedir
+ if process.returncode != 1:
+ raise Exception(
+ 'Could not test for presence of rev %s. Is cache dir "%s" messed up?' %
+ (rev, cachedir))
+
+ logging.debug(
+ 'We do not have rev %s. We will fetch ref "%s" and hope it appears.',
+ rev, ref)
+ fetch(repo, ref)
+ logging.debug('Verifying that fetch retrieved rev %s', rev)
+ subprocess.run(['git', '-C', cachedir, 'cat-file', '-e', rev], check=True)
+
+ return cachedir
--- /dev/null
+import os.path
+import tempfile
+import shutil
+import subprocess
+import unittest
+
+import git_cache
+
+
+def _git(directory: str, *args: str) -> bytes:
+ p = subprocess.run(['git', '-C', directory] + list(args),
+ stdout=subprocess.PIPE, check=True)
+ return p.stdout
+
+
+def _commit_file(
+ directory: str,
+ filename: str,
+ contents: str,
+ commit_message: str) -> None:
+ with open(os.path.join(directory, filename), 'w') as f:
+ f.write(contents)
+ _git(directory, 'add', filename)
+ _git(directory, 'commit', '-m', commit_message)
+
+
+# pylint: disable=too-many-public-methods
+class TestGitCache(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.xdgcache = tempfile.TemporaryDirectory(prefix='git_cache_test-')
+ self.old_XDG_CACHE_HOME = os.environ.get('XDG_CACHE_HOME')
+ os.environ['XDG_CACHE_HOME'] = self.xdgcache.name
+
+ os.environ['GIT_AUTHOR_NAME'] = 'test_git_cache'
+ os.environ['GIT_COMMITTER_NAME'] = 'test_git_cache'
+ os.environ['GIT_AUTHOR_EMAIL'] = 'test_git_cache@example.com'
+ os.environ['GIT_COMMITTER_EMAIL'] = 'test_git_cache@example.com'
+
+ self.tempdir = tempfile.TemporaryDirectory(prefix='git_cache_test-')
+ self.upstream = os.path.join(self.tempdir.name, 'upstream')
+ subprocess.run(['git', 'init', self.upstream], check=True)
+ _commit_file(self.upstream, 'file', 'Contents', 'First commit')
+
+ def tearDown(self) -> None:
+ if self.old_XDG_CACHE_HOME is None:
+ del os.environ['XDG_CACHE_HOME']
+ else:
+ os.environ['XDG_CACHE_HOME'] = self.old_XDG_CACHE_HOME
+
+ self.tempdir.cleanup()
+ self.xdgcache.cleanup()
+
+ def test_fetch(self) -> None:
+ d, rev = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(_git(d, 'show', '%s:file' % rev), b'Contents')
+
+ def test_fetch_twice(self) -> None:
+ d1, rev1 = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(_git(d1, 'show', '%s:file' % rev1), b'Contents')
+ d2, rev2 = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(d1, d2)
+ self.assertEqual(rev1, rev2)
+ self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+
+ def test_fetch_then_ensure(self) -> None:
+ d1, rev = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(_git(d1, 'show', '%s:file' % rev), b'Contents')
+ d2 = git_cache.ensure_rev_available(self.upstream, 'master', rev)
+ self.assertEqual(d1, d2)
+ self.assertEqual(_git(d2, 'show', '%s:file' % rev), b'Contents')
+
+ def test_ensure_then_fetch(self) -> None:
+ rev1 = _git(
+ self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ d1 = git_cache.ensure_rev_available(self.upstream, 'master', rev1)
+ self.assertEqual(_git(d1, 'show', '%s:file' % rev1), b'Contents')
+ d2, rev2 = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(d1, d2)
+ self.assertEqual(rev1, rev2)
+ self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+
+ def test_fetch_new_file(self) -> None:
+ d1, rev1 = git_cache.fetch(self.upstream, 'master')
+ _commit_file(self.upstream, 'foofile', 'foo', 'Foo')
+ d2, rev2 = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(d1, d2)
+ self.assertNotEqual(rev1, rev2)
+ self.assertEqual(_git(d2, 'show', '%s:foofile' % rev2), b'foo')
+
+ def test_ensure_doesnt_fetch_new_file(self) -> None:
+ d1, rev1 = git_cache.fetch(self.upstream, 'master')
+ _commit_file(self.upstream, 'foofile', 'foo', 'Foo')
+ rev2 = _git(
+ self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ self.assertNotEqual(rev1, rev2)
+ d2 = git_cache.ensure_rev_available(self.upstream, 'master', rev1)
+ self.assertEqual(d1, d2)
+ p = subprocess.run(
+ ['git', '-C', d2, 'show', '%s:foofile' % rev2], check=False)
+ self.assertNotEqual(p.returncode, 0)
+
+ def test_ensure_doesnt_fetch_from_deleted_upstream(self) -> None:
+ d1, rev = git_cache.fetch(self.upstream, 'master')
+ self.tempdir.cleanup()
+ d2 = git_cache.ensure_rev_available(self.upstream, 'master', rev)
+ self.assertEqual(d1, d2)
+
+ def test_ensure_fetches_new_file(self) -> None:
+ d1, rev1 = git_cache.fetch(self.upstream, 'master')
+ _commit_file(self.upstream, 'foofile', 'foo', 'Foo')
+ rev2 = _git(
+ self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ self.assertNotEqual(rev1, rev2)
+ d2 = git_cache.ensure_rev_available(self.upstream, 'master', rev2)
+ self.assertEqual(d1, d2)
+ self.assertEqual(_git(d2, 'show', '%s:foofile' % rev2), b'foo')
+
+ def test_fetch_raises_on_invalid_repo(self) -> None:
+ self.tempdir.cleanup()
+ with self.assertRaises(Exception):
+ git_cache.fetch(self.upstream, 'master')
+
+ def test_ensure_raises_on_invalid_repo(self) -> None:
+ rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ self.tempdir.cleanup()
+ with self.assertRaises(Exception):
+ git_cache.ensure_rev_available(self.upstream, 'master', rev)
+
+ def test_fetch_raises_on_invalid_ref(self) -> None:
+ with self.assertRaises(Exception):
+ git_cache.fetch(self.upstream, 'nobranch')
+
+ def test_ensure_raises_on_invalid_ref(self) -> None:
+ rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ with self.assertRaises(Exception):
+ git_cache.ensure_rev_available(self.upstream, 'nobranch', rev)
+
+ def test_ensure_raises_on_invalid_rev(self) -> None:
+ with self.assertRaises(Exception):
+ git_cache.ensure_rev_available(
+ self.upstream,
+ 'nobranch',
+ '1234567890abcdef01234567890abcdef1234567')
+
+ def test_ensure_raises_on_rev_from_other_branch(self) -> None:
+ _git(self.upstream, 'checkout', '-b', 'otherbranch')
+ _commit_file(self.upstream, 'foofile', 'foo', 'Foo')
+ rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ with self.assertRaises(Exception):
+ git_cache.ensure_rev_available(self.upstream, 'master', rev)
+
+ def test_ensure_other_branch(self) -> None:
+ _git(self.upstream, 'checkout', '-b', 'otherbranch')
+ _commit_file(self.upstream, 'foofile', 'foo', 'Foo')
+ rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ d = git_cache.ensure_rev_available(self.upstream, 'otherbranch', rev)
+ self.assertEqual(_git(d, 'show', '%s:foofile' % rev), b'foo')
+
+ def test_fetch_after_cache_deleted(self) -> None:
+ d1, rev1 = git_cache.fetch(self.upstream, 'master')
+ shutil.rmtree(d1)
+ d2, rev2 = git_cache.fetch(self.upstream, 'master')
+ self.assertEqual(d1, d2)
+ self.assertEqual(rev1, rev2)
+ self.assertEqual(_git(d2, 'show', '%s:file' % rev2), b'Contents')
+
+ def test_ensure_after_cache_deleted(self) -> None:
+ d1, rev = git_cache.fetch(self.upstream, 'master')
+ shutil.rmtree(d1)
+ d2 = git_cache.ensure_rev_available(self.upstream, 'master', rev)
+ self.assertEqual(d1, d2)
+ self.assertEqual(_git(d2, 'show', '%s:file' % rev), b'Contents')
+
+ def test_fetch_raises_on_amend(self) -> None:
+ git_cache.fetch(self.upstream, 'master')
+ _git(self.upstream, 'commit', '--amend', '-m', 'Amended')
+ with self.assertRaises(Exception):
+ git_cache.fetch(self.upstream, 'master')
+
+ def test_ensure_raises_on_amend(self) -> None:
+ git_cache.fetch(self.upstream, 'master')
+ _git(self.upstream, 'commit', '--amend', '-m', 'Amended')
+ rev = _git(self.upstream, 'log', '--format=%H', '-n1').strip().decode()
+ with self.assertRaises(Exception):
+ git_cache.ensure_rev_available(self.upstream, 'master', rev)
+
+
+if __name__ == '__main__':
+ unittest.main()