]> git.scottworley.com Git - pinch/blame_incremental - pinch.py
main creates the Channel
[pinch] / pinch.py
... / ...
CommitLineData
1import filecmp
2import functools
3import hashlib
4import operator
5import os
6import os.path
7import shutil
8import subprocess
9import tempfile
10import types
11import urllib.parse
12import urllib.request
13import xml.dom.minidom
14
15from typing import (
16 Dict,
17 Iterable,
18 List,
19 NewType,
20 Sequence,
21 Tuple,
22)
23
24Digest16 = NewType('Digest16', str)
25Digest32 = NewType('Digest32', str)
26
27
28class ChannelTableEntry(types.SimpleNamespace):
29 digest: Digest16
30 file: str
31 size: int
32 url: str
33
34
35class Channel(types.SimpleNamespace):
36 channel_html: bytes
37 forwarded_url: str
38 git_revision: str
39 release_name: str
40 table: Dict[str, ChannelTableEntry]
41 url: str
42
43
44class VerificationError(Exception):
45 pass
46
47
48class Verification:
49
50 def __init__(self) -> None:
51 self.line_length = 0
52
53 def status(self, s: str) -> None:
54 print(s, end=' ', flush=True)
55 self.line_length += 1 + len(s) # Unicode??
56
57 @staticmethod
58 def _color(s: str, c: int) -> str:
59 return '\033[%2dm%s\033[00m' % (c, s)
60
61 def result(self, r: bool) -> None:
62 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
63 length = len(message)
64 cols = shutil.get_terminal_size().columns
65 pad = (cols - (self.line_length + length)) % cols
66 print(' ' * pad + self._color(message, color))
67 self.line_length = 0
68 if not r:
69 raise VerificationError()
70
71 def check(self, s: str, r: bool) -> None:
72 self.status(s)
73 self.result(r)
74
75 def ok(self) -> None:
76 self.result(True)
77
78
79def compare(a: str,
80 b: str) -> Tuple[Sequence[str],
81 Sequence[str],
82 Sequence[str]]:
83
84 def throw(error: OSError) -> None:
85 raise error
86
87 def join(x: str, y: str) -> str:
88 return y if x == '.' else os.path.join(x, y)
89
90 def recursive_files(d: str) -> Iterable[str]:
91 all_files: List[str] = []
92 for path, dirs, files in os.walk(d, onerror=throw):
93 rel = os.path.relpath(path, start=d)
94 all_files.extend(join(rel, f) for f in files)
95 for dir_or_link in dirs:
96 if os.path.islink(join(path, dir_or_link)):
97 all_files.append(join(rel, dir_or_link))
98 return all_files
99
100 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
101 return (f for f in files if not f.startswith('.git/'))
102
103 files = functools.reduce(
104 operator.or_, (set(
105 exclude_dot_git(
106 recursive_files(x))) for x in [a, b]))
107 return filecmp.cmpfiles(a, b, files, shallow=False)
108
109
110def fetch(v: Verification, channel: Channel) -> None:
111 v.status('Fetching channel')
112 request = urllib.request.urlopen(channel.url, timeout=10)
113 channel.channel_html = request.read()
114 channel.forwarded_url = request.geturl()
115 v.result(request.status == 200)
116 v.check('Got forwarded', channel.url != channel.forwarded_url)
117
118
119def parse_channel(v: Verification, channel: Channel) -> None:
120 v.status('Parsing channel description as XML')
121 d = xml.dom.minidom.parseString(channel.channel_html)
122 v.ok()
123
124 v.status('Extracting release name:')
125 title_name = d.getElementsByTagName(
126 'title')[0].firstChild.nodeValue.split()[2]
127 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
128 v.status(title_name)
129 v.result(title_name == h1_name)
130 channel.release_name = title_name
131
132 v.status('Extracting git commit:')
133 git_commit_node = d.getElementsByTagName('tt')[0]
134 channel.git_commit = git_commit_node.firstChild.nodeValue
135 v.status(channel.git_commit)
136 v.ok()
137 v.status('Verifying git commit label')
138 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
139
140 v.status('Parsing table')
141 channel.table = {}
142 for row in d.getElementsByTagName('tr')[1:]:
143 name = row.childNodes[0].firstChild.firstChild.nodeValue
144 url = row.childNodes[0].firstChild.getAttribute('href')
145 size = int(row.childNodes[1].firstChild.nodeValue)
146 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
147 channel.table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
148 v.ok()
149
150
151def digest_file(filename: str) -> Digest16:
152 hasher = hashlib.sha256()
153 with open(filename, 'rb') as f:
154 # pylint: disable=cell-var-from-loop
155 for block in iter(lambda: f.read(4096), b''):
156 hasher.update(block)
157 return Digest16(hasher.hexdigest())
158
159
160def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
161 v.status('Converting digest to base16')
162 process = subprocess.run(
163 ['nix', 'to-base16', '--type', 'sha256', digest32], capture_output=True)
164 v.result(process.returncode == 0)
165 return Digest16(process.stdout.decode().strip())
166
167
168def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
169 v.status('Converting digest to base32')
170 process = subprocess.run(
171 ['nix', 'to-base32', '--type', 'sha256', digest16], capture_output=True)
172 v.result(process.returncode == 0)
173 return Digest32(process.stdout.decode().strip())
174
175
176def fetch_with_nix_prefetch_url(
177 v: Verification,
178 url: str,
179 digest: Digest16) -> str:
180 v.status('Fetching %s' % url)
181 process = subprocess.run(
182 ['nix-prefetch-url', '--print-path', url, digest], capture_output=True)
183 v.result(process.returncode == 0)
184 prefetch_digest, path, empty = process.stdout.decode().split('\n')
185 assert empty == ''
186 v.check("Verifying nix-prefetch-url's digest",
187 to_Digest16(v, Digest32(prefetch_digest)) == digest)
188 v.status("Verifying file digest")
189 file_digest = digest_file(path)
190 v.result(file_digest == digest)
191 return path
192
193
194def fetch_resources(v: Verification, channel: Channel) -> None:
195 for resource in ['git-revision', 'nixexprs.tar.xz']:
196 fields = channel.table[resource]
197 url = urllib.parse.urljoin(channel.forwarded_url, fields.url)
198 fields.file = fetch_with_nix_prefetch_url(v, url, fields.digest)
199 v.status('Verifying git commit on main page matches git commit in table')
200 v.result(
201 open(
202 channel.table['git-revision'].file).read(999) == channel.git_commit)
203
204
205def check_channel_contents(v: Verification, channel: Channel) -> None:
206 with tempfile.TemporaryDirectory() as d:
207 v.status('Extracting %s' % channel.table['nixexprs.tar.xz'].file)
208 shutil.unpack_archive(channel.table['nixexprs.tar.xz'].file, d)
209 v.ok()
210 v.status('Removing temporary directory')
211 v.ok()
212
213
214def main() -> None:
215 v = Verification()
216 channel = Channel(url='https://channels.nixos.org/nixos-20.03')
217 fetch(v, channel)
218 parse_channel(v, channel)
219 fetch_resources(v, channel)
220 check_channel_contents(v, channel)
221 print(channel)
222
223
224main()