+ digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
+ table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
+ v.ok()
+ return table, GitPin(release_name=release_name, git_revision=git_revision)
+
+
+def digest_string(s: bytes) -> Digest16:
+ return Digest16(hashlib.sha256(s).hexdigest())
+
+
+def digest_file(filename: str) -> Digest16:
+ hasher = hashlib.sha256()
+ with open(filename, 'rb') as f:
+ # pylint: disable=cell-var-from-loop
+ for block in iter(lambda: f.read(4096), b''):
+ hasher.update(block)
+ return Digest16(hasher.hexdigest())
+
+
+@functools.lru_cache
+def _experimental_flag_needed(v: Verification) -> bool:
+ v.status('Checking Nix version')
+ process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return b'--experimental-features' in process.stdout
+
+
+def _nix_command(v: Verification) -> List[str]:
+ return ['nix', '--experimental-features',
+ 'nix-command'] if _experimental_flag_needed(v) else ['nix']
+
+
+def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
+ v.status('Converting digest to base16')
+ process = subprocess.run(_nix_command(v) + [
+ 'to-base16',
+ '--type',
+ 'sha256',
+ digest32],
+ stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return Digest16(process.stdout.decode().strip())
+
+
+def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
+ v.status('Converting digest to base32')
+ process = subprocess.run(_nix_command(v) + [
+ 'to-base32',
+ '--type',
+ 'sha256',
+ digest16],
+ stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return Digest32(process.stdout.decode().strip())
+
+
+def fetch_with_nix_prefetch_url(
+ v: Verification,
+ url: str,
+ digest: Digest16) -> str:
+ v.status(f'Fetching {url}')
+ process = subprocess.run(
+ ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ prefetch_digest, path, empty = process.stdout.decode().split('\n')
+ assert empty == ''
+ v.check("Verifying nix-prefetch-url's digest",
+ to_Digest16(v, Digest32(prefetch_digest)) == digest)
+ v.status(f"Verifying digest of {path}")
+ file_digest = digest_file(path)
+ v.result(file_digest == digest)
+ return path # type: ignore # (for old mypy)
+
+
+def fetch_resources(
+ v: Verification,
+ pin: GitPin,
+ forwarded_url: str,
+ table: Dict[str, ChannelTableEntry]) -> None:
+ for resource in ['git-revision', 'nixexprs.tar.xz']:
+ fields = table[resource]
+ fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
+ fields.file = fetch_with_nix_prefetch_url(
+ v, fields.absolute_url, fields.digest)
+ v.status('Verifying git commit on main page matches git commit in table')
+ with open(table['git-revision'].file, encoding='utf-8') as rev_file:
+ v.result(rev_file.read(999) == pin.git_revision)
+
+
+def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
+ return os.path.join(
+ xdg.XDG_CACHE_HOME,
+ 'pinch/git-tarball',
+ f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
+
+
+def verify_git_ancestry(
+ v: Verification,
+ channel: TarrableSearchPath,
+ old_revision: str,
+ new_revision: str) -> None:
+ cachedir = git_cache.git_cachedir(channel.git_repo)
+ v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
+ process = subprocess.run(['git',
+ '-C',
+ cachedir,
+ 'merge-base',
+ '--is-ancestor',
+ old_revision,
+ new_revision])
+ v.result(process.returncode == 0)
+
+
+def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
+ a = os.path.join(root1, path)
+ b = os.path.join(root2, path)
+ return (os.path.islink(a)
+ and os.path.islink(b)
+ and not os.path.exists(a)
+ and not os.path.exists(b)
+ and os.readlink(a) == os.readlink(b))
+
+
+def compare_tarball_and_git(
+ v: Verification,
+ pin: GitPin,
+ channel_contents: str,
+ git_contents: str) -> None:
+ v.status('Comparing channel tarball with git checkout')
+ tarball_contents = os.path.join(channel_contents, pin.release_name)
+ match, mismatch, errors = compare(tarball_contents, git_contents)
+ v.ok()
+ v.check(f'{len(match)} files match', len(match) > 0)
+ v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
+ expected_errors = [
+ '.git-revision',
+ '.version-suffix',
+ 'nixpkgs',
+ 'programs.sqlite',
+ 'svn-revision']
+ benign_expected_errors = []
+ for ee in expected_errors:
+ if ee in errors:
+ errors.remove(ee)
+ benign_expected_errors.append(ee)
+ errors = [
+ e for e in errors
+ if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
+ ]
+ v.check(
+ f'{len(errors)} unexpected incomparable files: {errors}',
+ len(errors) == 0)
+ v.check(
+ f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
+ len(benign_expected_errors) == len(expected_errors))
+
+
+def extract_tarball(
+ v: Verification,
+ table: Dict[str, ChannelTableEntry],
+ dest: str) -> None:
+ v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
+ shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)