import os
import os.path
import shutil
+import subprocess
import tempfile
import types
import urllib.parse
Dict,
Iterable,
List,
+ NewType,
Sequence,
Tuple,
)
+Digest16 = NewType('Digest16', str)
+Digest32 = NewType('Digest32', str)
+
class InfoTableEntry(types.SimpleNamespace):
- content: bytes
- digest: str
+ digest: Digest16
file: str
size: int
url: str
channel_html: bytes
forwarded_url: str
git_revision: str
+ release_name: str
table: Dict[str, InfoTableEntry]
url: str
return info
-def parse(v: Verification, info: Info) -> None:
+def parse_table(v: Verification, info: Info) -> None:
v.status('Parsing channel description as XML')
d = xml.dom.minidom.parseString(info.channel_html)
v.ok()
- v.status('Extracting git commit')
+ v.status('Extracting release name:')
+ title_name = d.getElementsByTagName(
+ 'title')[0].firstChild.nodeValue.split()[2]
+ h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
+ v.status(title_name)
+ v.result(title_name == h1_name)
+ info.release_name = title_name
+
+ v.status('Extracting git commit:')
git_commit_node = d.getElementsByTagName('tt')[0]
info.git_commit = git_commit_node.firstChild.nodeValue
+ v.status(info.git_commit)
v.ok()
v.status('Verifying git commit label')
v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
name = row.childNodes[0].firstChild.firstChild.nodeValue
url = row.childNodes[0].firstChild.getAttribute('href')
size = int(row.childNodes[1].firstChild.nodeValue)
- digest = row.childNodes[2].firstChild.firstChild.nodeValue
+ digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
info.table[name] = InfoTableEntry(url=url, digest=digest, size=size)
v.ok()
-def fetch_resources(v: Verification, info: Info) -> None:
+def digest_file(filename: str) -> Digest16:
+ hasher = hashlib.sha256()
+ with open(filename, 'rb') as f:
+ # pylint: disable=cell-var-from-loop
+ for block in iter(lambda: f.read(4096), b''):
+ hasher.update(block)
+ return Digest16(hasher.hexdigest())
+
+
+def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
+ v.status('Converting digest to base16')
+ process = subprocess.run(
+ ['nix', 'to-base16', '--type', 'sha256', digest32], capture_output=True)
+ v.result(process.returncode == 0)
+ return Digest16(process.stdout.decode().strip())
+
+
+def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
+ v.status('Converting digest to base32')
+ process = subprocess.run(
+ ['nix', 'to-base32', '--type', 'sha256', digest16], capture_output=True)
+ v.result(process.returncode == 0)
+ return Digest32(process.stdout.decode().strip())
+
+
+def fetch_with_nix_prefetch_url(
+ v: Verification,
+ url: str,
+ digest: Digest16) -> str:
+ v.status('Fetching %s' % url)
+ process = subprocess.run(
+ ['nix-prefetch-url', '--print-path', url, digest], capture_output=True)
+ v.result(process.returncode == 0)
+ prefetch_digest, path, empty = process.stdout.decode().split('\n')
+ assert empty == ''
+ v.check("Verifying nix-prefetch-url's digest",
+ to_Digest16(v, Digest32(prefetch_digest)) == digest)
+ v.status("Verifying file digest")
+ file_digest = digest_file(path)
+ v.result(file_digest == digest)
+ return path
+
+def fetch_resources(v: Verification, info: Info) -> None:
for resource in ['git-revision', 'nixexprs.tar.xz']:
fields = info.table[resource]
- v.status('Fetching resource "%s"' % resource)
url = urllib.parse.urljoin(info.forwarded_url, fields.url)
- request = urllib.request.urlopen(url, timeout=10)
- if fields.size < 4096:
- fields.content = request.read()
- else:
- with tempfile.NamedTemporaryFile(suffix='.nixexprs.tar.xz', delete=False) as tmp_file:
- shutil.copyfileobj(request, tmp_file)
- fields.file = tmp_file.name
- v.result(request.status == 200)
- v.status('Verifying digest for "%s"' % resource)
- if fields.size < 4096:
- actual_hash = hashlib.sha256(fields.content).hexdigest()
- else:
- hasher = hashlib.sha256()
- with open(fields.file, 'rb') as f:
- # pylint: disable=cell-var-from-loop
- for block in iter(lambda: f.read(4096), b''):
- hasher.update(block)
- actual_hash = hasher.hexdigest()
- v.result(actual_hash == fields.digest)
- v.check('Verifying git commit on main page matches git commit in table',
- info.table['git-revision'].content.decode() == info.git_commit)
+ fields.file = fetch_with_nix_prefetch_url(v, url, fields.digest)
+ v.status('Verifying git commit on main page matches git commit in table')
+ v.result(
+ open(
+ info.table['git-revision'].file).read(999) == info.git_commit)
def extract_channel(v: Verification, info: Info) -> None:
with tempfile.TemporaryDirectory() as d:
- v.status('Extracting nixexprs.tar.xz')
+ v.status('Extracting %s' % info.table['nixexprs.tar.xz'].file)
shutil.unpack_archive(info.table['nixexprs.tar.xz'].file, d)
v.ok()
v.status('Removing temporary directory')
def main() -> None:
v = Verification()
info = fetch(v, 'https://channels.nixos.org/nixos-20.03')
- parse(v, info)
+ parse_table(v, info)
fetch_resources(v, info)
extract_channel(v, info)
print(info)