]> git.scottworley.com Git - pinch/blob - pinch.py
Fetch with nix-prefetch-url for the caching
[pinch] / pinch.py
1 import filecmp
2 import functools
3 import hashlib
4 import operator
5 import os
6 import os.path
7 import shutil
8 import subprocess
9 import tempfile
10 import types
11 import urllib.parse
12 import urllib.request
13 import xml.dom.minidom
14
15 from typing import (
16 Dict,
17 Iterable,
18 List,
19 NewType,
20 Sequence,
21 Tuple,
22 )
23
24 Digest16 = NewType('Digest16', str)
25 Digest32 = NewType('Digest32', str)
26
27
28 class InfoTableEntry(types.SimpleNamespace):
29 digest: Digest16
30 file: str
31 size: int
32 url: str
33
34
35 class Info(types.SimpleNamespace):
36 channel_html: bytes
37 forwarded_url: str
38 git_revision: str
39 table: Dict[str, InfoTableEntry]
40 url: str
41
42
43 class VerificationError(Exception):
44 pass
45
46
47 class Verification:
48
49 def __init__(self) -> None:
50 self.line_length = 0
51
52 def status(self, s: str) -> None:
53 print(s, end=' ', flush=True)
54 self.line_length += 1 + len(s) # Unicode??
55
56 @staticmethod
57 def _color(s: str, c: int) -> str:
58 return '\033[%2dm%s\033[00m' % (c, s)
59
60 def result(self, r: bool) -> None:
61 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
62 length = len(message)
63 cols = shutil.get_terminal_size().columns
64 pad = (cols - (self.line_length + length)) % cols
65 print(' ' * pad + self._color(message, color))
66 self.line_length = 0
67 if not r:
68 raise VerificationError()
69
70 def check(self, s: str, r: bool) -> None:
71 self.status(s)
72 self.result(r)
73
74 def ok(self) -> None:
75 self.result(True)
76
77
78 def compare(a: str,
79 b: str) -> Tuple[Sequence[str],
80 Sequence[str],
81 Sequence[str]]:
82
83 def throw(error: OSError) -> None:
84 raise error
85
86 def join(x: str, y: str) -> str:
87 return y if x == '.' else os.path.join(x, y)
88
89 def recursive_files(d: str) -> Iterable[str]:
90 all_files: List[str] = []
91 for path, dirs, files in os.walk(d, onerror=throw):
92 rel = os.path.relpath(path, start=d)
93 all_files.extend(join(rel, f) for f in files)
94 for dir_or_link in dirs:
95 if os.path.islink(join(path, dir_or_link)):
96 all_files.append(join(rel, dir_or_link))
97 return all_files
98
99 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
100 return (f for f in files if not f.startswith('.git/'))
101
102 files = functools.reduce(
103 operator.or_, (set(
104 exclude_dot_git(
105 recursive_files(x))) for x in [a, b]))
106 return filecmp.cmpfiles(a, b, files, shallow=False)
107
108
109 def fetch(v: Verification, channel_url: str) -> Info:
110 info = Info()
111 info.url = channel_url
112 v.status('Fetching channel')
113 request = urllib.request.urlopen(
114 'https://channels.nixos.org/nixos-20.03', timeout=10)
115 info.channel_html = request.read()
116 info.forwarded_url = request.geturl()
117 v.result(request.status == 200)
118 v.check('Got forwarded', info.url != info.forwarded_url)
119 return info
120
121
122 def parse_table(v: Verification, info: Info) -> None:
123 v.status('Parsing channel description as XML')
124 d = xml.dom.minidom.parseString(info.channel_html)
125 v.ok()
126
127 v.status('Extracting git commit')
128 git_commit_node = d.getElementsByTagName('tt')[0]
129 info.git_commit = git_commit_node.firstChild.nodeValue
130 v.ok()
131 v.status('Verifying git commit label')
132 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
133
134 v.status('Parsing table')
135 info.table = {}
136 for row in d.getElementsByTagName('tr')[1:]:
137 name = row.childNodes[0].firstChild.firstChild.nodeValue
138 url = row.childNodes[0].firstChild.getAttribute('href')
139 size = int(row.childNodes[1].firstChild.nodeValue)
140 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
141 info.table[name] = InfoTableEntry(url=url, digest=digest, size=size)
142 v.ok()
143
144
145 def digest_file(filename: str) -> Digest16:
146 hasher = hashlib.sha256()
147 with open(filename, 'rb') as f:
148 # pylint: disable=cell-var-from-loop
149 for block in iter(lambda: f.read(4096), b''):
150 hasher.update(block)
151 return Digest16(hasher.hexdigest())
152
153
154 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
155 v.status('Converting digest to base16')
156 process = subprocess.run(
157 ['nix', 'to-base16', '--type', 'sha256', digest32], capture_output=True)
158 v.result(process.returncode == 0)
159 return Digest16(process.stdout.decode().strip())
160
161
162 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
163 v.status('Converting digest to base32')
164 process = subprocess.run(
165 ['nix', 'to-base32', '--type', 'sha256', digest16], capture_output=True)
166 v.result(process.returncode == 0)
167 return Digest32(process.stdout.decode().strip())
168
169
170 def fetch_with_nix_prefetch_url(
171 v: Verification,
172 url: str,
173 digest: Digest16) -> str:
174 v.status('Fetching %s' % url)
175 process = subprocess.run(
176 ['nix-prefetch-url', '--print-path', url, digest], capture_output=True)
177 v.result(process.returncode == 0)
178 prefetch_digest, path, empty = process.stdout.decode().split('\n')
179 assert empty == ''
180 v.check("Verifying nix-prefetch-url's digest",
181 to_Digest16(v, Digest32(prefetch_digest)) == digest)
182 v.status("Verifying file digest")
183 file_digest = digest_file(path)
184 v.result(file_digest == digest)
185 return path
186
187
188 def fetch_resources(v: Verification, info: Info) -> None:
189 for resource in ['git-revision', 'nixexprs.tar.xz']:
190 fields = info.table[resource]
191 url = urllib.parse.urljoin(info.forwarded_url, fields.url)
192 fields.file = fetch_with_nix_prefetch_url(v, url, fields.digest)
193 v.status('Verifying git commit on main page matches git commit in table')
194 v.result(
195 open(
196 info.table['git-revision'].file).read(999) == info.git_commit)
197
198
199 def extract_channel(v: Verification, info: Info) -> None:
200 with tempfile.TemporaryDirectory() as d:
201 v.status('Extracting nixexprs.tar.xz')
202 shutil.unpack_archive(info.table['nixexprs.tar.xz'].file, d)
203 v.ok()
204 v.status('Removing temporary directory')
205 v.ok()
206
207
208 def main() -> None:
209 v = Verification()
210 info = fetch(v, 'https://channels.nixos.org/nixos-20.03')
211 parse_table(v, info)
212 fetch_resources(v, info)
213 extract_channel(v, info)
214 print(info)
215
216
217 main()