]> git.scottworley.com Git - git-cache/blob - git_cache.py
Refuse force-pushes even if cache is cleared
[git-cache] / git_cache.py
1 # It would be nice if we could share the nix git cache, but as of the
2 # time of writing it is transitioning from gitv2 (deprecated) to gitv3
3 # (not ready yet), and trying to straddle them both is too far into nix
4 # implementation details for my comfort. So we re-implement here half of
5 # nix's builtins.fetchGit. :(
6
7 import functools
8 import hashlib
9 import logging
10 import os
11 import subprocess
12 import sys
13 import time
14
15 from typing import Iterator, NamedTuple, Optional, TypeVar, Tuple, Union
16
17 import backoff
18
19 Path = str # eg: "/home/user/.cache/git-cache/v1"
20 Repo = str # eg: "https://github.com/NixOS/nixpkgs.git"
21 Ref = str # eg: "master" or "v1.0.0"
22 Rev = str # eg: "53a27350551844e1ed1a9257690294767389ef0d"
23 RefOrRev = Union[Ref, Rev]
24
25
26 class _LogEntry(NamedTuple):
27 ref: Ref
28 rev: Rev
29
30
31 T = TypeVar('T')
32
33
34 def _repo_hashname(repo: Repo) -> str:
35 return hashlib.sha256(repo.encode()).hexdigest()
36
37
38 def git_cachedir(repo: Repo) -> Path:
39 # Use xdg module when it's less painful to have as a dependency
40 XDG_CACHE_HOME = Path(
41 os.environ.get('XDG_CACHE_HOME', os.path.expanduser('~/.cache')))
42
43 return Path(os.path.join(
44 XDG_CACHE_HOME,
45 'git-cache/v1',
46 _repo_hashname(repo)))
47
48
49 def _log_filename(repo: Repo) -> Path:
50 # Use xdg module when it's less painful to have as a dependency
51 XDG_DATA_HOME = Path(
52 os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')))
53
54 return Path(os.path.join(
55 XDG_DATA_HOME,
56 'git-cache/v1',
57 _repo_hashname(repo)))
58
59
60 def is_ancestor(repo: Repo, descendant: RefOrRev, ancestor: RefOrRev) -> bool:
61 cachedir = git_cachedir(repo)
62 logging.debug('Checking if %s is an ancestor of %s', ancestor, descendant)
63 process = subprocess.run(['git',
64 '-C',
65 cachedir,
66 'merge-base',
67 '--is-ancestor',
68 ancestor,
69 descendant],
70 check=False)
71 return process.returncode == 0
72
73
74 def verify_ancestry(
75 repo: Repo,
76 descendant: RefOrRev,
77 ancestor: RefOrRev) -> None:
78 if not is_ancestor(repo, descendant, ancestor):
79 raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
80
81
82 def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
83 filename = _log_filename(repo)
84 if not os.path.exists(filename):
85 return
86 with open(filename, 'r') as f:
87 for line in f:
88 _, _, rev, ref = line.strip().split(maxsplit=3)
89 yield _LogEntry(ref, rev)
90
91
92 def _last(it: Iterator[T]) -> Optional[T]:
93 return functools.reduce(lambda a, b: b, it, None)
94
95
96 def _previous_fetched_rev(repo: Repo, ref: Ref) -> Optional[Rev]:
97 return _last(entry.rev for entry in _read_fetch_log(
98 repo) if entry.ref == ref)
99
100
101 def _log_fetch(repo: Repo, ref: Ref, rev: Rev) -> None:
102 prev_rev = _previous_fetched_rev(repo, ref)
103 if prev_rev is not None:
104 verify_ancestry(repo, rev, prev_rev)
105 filename = _log_filename(repo)
106 os.makedirs(os.path.dirname(filename), exist_ok=True)
107 with open(filename, 'a') as f:
108 f.write('%s fetch %s %s\n' %
109 (time.strftime('%Y-%m%d-%H:%M:%S%z'), rev, ref))
110
111
112 @backoff.on_exception(
113 backoff.expo,
114 subprocess.CalledProcessError,
115 max_time=lambda: int(os.environ.get('BACKOFF_MAX_TIME', '30')))
116 def _git_fetch(cachedir: Path, repo: Repo, ref: Ref) -> None:
117 # We don't use --force here because we want to abort and freak out if forced
118 # updates are happening.
119 subprocess.run(['git', '-C', cachedir, 'fetch', repo,
120 '%s:%s' % (ref, ref)], check=True)
121
122
123 def fetch(repo: Repo, ref: Ref) -> Tuple[Path, Rev]:
124 cachedir = git_cachedir(repo)
125 if not os.path.exists(cachedir):
126 logging.debug("Initializing git repo")
127 subprocess.run(['git', 'init', '--bare', cachedir],
128 check=True, stdout=sys.stderr)
129
130 logging.debug('Fetching ref "%s" from %s', ref, repo)
131 _git_fetch(cachedir, repo, ref)
132
133 with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file:
134 rev = Rev(rev_file.read(999).strip())
135 verify_ancestry(repo, ref, rev)
136 _log_fetch(repo, ref, rev)
137
138 return cachedir, rev
139
140
141 def ensure_rev_available(repo: Repo, ref: Ref, rev: Rev) -> Path:
142 cachedir = git_cachedir(repo)
143 if os.path.exists(cachedir) and is_ancestor(repo, ref, rev):
144 return cachedir
145
146 logging.debug(
147 'We do not have rev %s. We will fetch ref "%s" and hope it appears.',
148 rev, ref)
149 fetch(repo, ref)
150 logging.debug('Verifying that fetch retrieved rev %s', rev)
151 subprocess.run(['git', '-C', cachedir, 'cat-file', '-e', rev], check=True)
152 verify_ancestry(repo, ref, rev)
153
154 return cachedir
155
156
157 def _main() -> None:
158 if len(sys.argv) == 3:
159 print('{1} {0}'.format(*fetch(Repo(sys.argv[1]), Ref(sys.argv[2]))))
160 elif len(sys.argv) == 4:
161 print(ensure_rev_available(
162 Repo(sys.argv[1]), Ref(sys.argv[2]), Rev(sys.argv[3])))
163 else:
164 usage = '''usage: git-cache repo ref [rev]
165 example: git-cache https://github.com/NixOS/nixpkgs.git master'''
166 print(usage, file=sys.stderr)
167 sys.exit(1)