13 import xml
.dom
.minidom
24 Digest16
= NewType('Digest16', str)
25 Digest32
= NewType('Digest32', str)
28 class ChannelTableEntry(types
.SimpleNamespace
):
35 class Channel(types
.SimpleNamespace
):
40 table
: Dict
[str, ChannelTableEntry
]
44 class VerificationError(Exception):
50 def __init__(self
) -> None:
53 def status(self
, s
: str) -> None:
54 print(s
, end
=' ', flush
=True)
55 self
.line_length
+= 1 + len(s
) # Unicode??
58 def _color(s
: str, c
: int) -> str:
59 return '\033[%2dm%s\033[00m' % (c
, s
)
61 def result(self
, r
: bool) -> None:
62 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
64 cols
= shutil
.get_terminal_size().columns
65 pad
= (cols
- (self
.line_length
+ length
)) % cols
66 print(' ' * pad
+ self
._color
(message
, color
))
69 raise VerificationError()
71 def check(self
, s
: str, r
: bool) -> None:
80 b
: str) -> Tuple
[Sequence
[str],
84 def throw(error
: OSError) -> None:
87 def join(x
: str, y
: str) -> str:
88 return y
if x
== '.' else os
.path
.join(x
, y
)
90 def recursive_files(d
: str) -> Iterable
[str]:
91 all_files
: List
[str] = []
92 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
93 rel
= os
.path
.relpath(path
, start
=d
)
94 all_files
.extend(join(rel
, f
) for f
in files
)
95 for dir_or_link
in dirs
:
96 if os
.path
.islink(join(path
, dir_or_link
)):
97 all_files
.append(join(rel
, dir_or_link
))
100 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
101 return (f
for f
in files
if not f
.startswith('.git/'))
103 files
= functools
.reduce(
106 recursive_files(x
))) for x
in [a
, b
]))
107 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
110 def fetch(v
: Verification
, channel
: Channel
) -> None:
111 v
.status('Fetching channel')
112 request
= urllib
.request
.urlopen(channel
.url
, timeout
=10)
113 channel
.channel_html
= request
.read()
114 channel
.forwarded_url
= request
.geturl()
115 v
.result(request
.status
== 200)
116 v
.check('Got forwarded', channel
.url
!= channel
.forwarded_url
)
119 def parse_channel(v
: Verification
, channel
: Channel
) -> None:
120 v
.status('Parsing channel description as XML')
121 d
= xml
.dom
.minidom
.parseString(channel
.channel_html
)
124 v
.status('Extracting release name:')
125 title_name
= d
.getElementsByTagName(
126 'title')[0].firstChild
.nodeValue
.split()[2]
127 h1_name
= d
.getElementsByTagName('h1')[0].firstChild
.nodeValue
.split()[2]
129 v
.result(title_name
== h1_name
)
130 channel
.release_name
= title_name
132 v
.status('Extracting git commit:')
133 git_commit_node
= d
.getElementsByTagName('tt')[0]
134 channel
.git_commit
= git_commit_node
.firstChild
.nodeValue
135 v
.status(channel
.git_commit
)
137 v
.status('Verifying git commit label')
138 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
140 v
.status('Parsing table')
142 for row
in d
.getElementsByTagName('tr')[1:]:
143 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
144 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
145 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
146 digest
= Digest16(row
.childNodes
[2].firstChild
.firstChild
.nodeValue
)
147 channel
.table
[name
] = ChannelTableEntry(url
=url
, digest
=digest
, size
=size
)
151 def digest_file(filename
: str) -> Digest16
:
152 hasher
= hashlib
.sha256()
153 with open(filename
, 'rb') as f
:
154 # pylint: disable=cell-var-from-loop
155 for block
in iter(lambda: f
.read(4096), b
''):
157 return Digest16(hasher
.hexdigest())
160 def to_Digest16(v
: Verification
, digest32
: Digest32
) -> Digest16
:
161 v
.status('Converting digest to base16')
162 process
= subprocess
.run(
163 ['nix', 'to-base16', '--type', 'sha256', digest32
], capture_output
=True)
164 v
.result(process
.returncode
== 0)
165 return Digest16(process
.stdout
.decode().strip())
168 def to_Digest32(v
: Verification
, digest16
: Digest16
) -> Digest32
:
169 v
.status('Converting digest to base32')
170 process
= subprocess
.run(
171 ['nix', 'to-base32', '--type', 'sha256', digest16
], capture_output
=True)
172 v
.result(process
.returncode
== 0)
173 return Digest32(process
.stdout
.decode().strip())
176 def fetch_with_nix_prefetch_url(
179 digest
: Digest16
) -> str:
180 v
.status('Fetching %s' % url
)
181 process
= subprocess
.run(
182 ['nix-prefetch-url', '--print-path', url
, digest
], capture_output
=True)
183 v
.result(process
.returncode
== 0)
184 prefetch_digest
, path
, empty
= process
.stdout
.decode().split('\n')
186 v
.check("Verifying nix-prefetch-url's digest",
187 to_Digest16(v
, Digest32(prefetch_digest
)) == digest
)
188 v
.status("Verifying file digest")
189 file_digest
= digest_file(path
)
190 v
.result(file_digest
== digest
)
194 def fetch_resources(v
: Verification
, channel
: Channel
) -> None:
195 for resource
in ['git-revision', 'nixexprs.tar.xz']:
196 fields
= channel
.table
[resource
]
197 url
= urllib
.parse
.urljoin(channel
.forwarded_url
, fields
.url
)
198 fields
.file = fetch_with_nix_prefetch_url(v
, url
, fields
.digest
)
199 v
.status('Verifying git commit on main page matches git commit in table')
202 channel
.table
['git-revision'].file).read(999) == channel
.git_commit
)
205 def check_channel_contents(v
: Verification
, channel
: Channel
) -> None:
206 with tempfile
.TemporaryDirectory() as d
:
207 v
.status('Extracting %s' % channel
.table
['nixexprs.tar.xz'].file)
208 shutil
.unpack_archive(channel
.table
['nixexprs.tar.xz'].file, d
)
210 v
.status('Removing temporary directory')
216 channel
= Channel(url
='https://channels.nixos.org/nixos-20.03')
218 parse_channel(v
, channel
)
219 fetch_resources(v
, channel
)
220 check_channel_contents(v
, channel
)