+ digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
+ table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
+ v.ok()
+ return table, GitPin(release_name=title_name, git_revision=git_revision)
+
+
+def digest_string(s: bytes) -> Digest16:
+ return Digest16(hashlib.sha256(s).hexdigest())
+
+
+def digest_file(filename: str) -> Digest16:
+ hasher = hashlib.sha256()
+ with open(filename, 'rb') as f:
+ # pylint: disable=cell-var-from-loop
+ for block in iter(lambda: f.read(4096), b''):
+ hasher.update(block)
+ return Digest16(hasher.hexdigest())
+
+
+@functools.lru_cache
+def _experimental_flag_needed(v: Verification) -> bool:
+ v.status('Checking Nix version')
+ process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return b'--experimental-features' in process.stdout
+
+
+def _nix_command(v: Verification) -> List[str]:
+ return ['nix', '--experimental-features',
+ 'nix-command'] if _experimental_flag_needed(v) else ['nix']
+
+
+def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
+ v.status('Converting digest to base16')
+ process = subprocess.run(_nix_command(v) + [
+ 'to-base16',
+ '--type',
+ 'sha256',
+ digest32],
+ stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return Digest16(process.stdout.decode().strip())
+
+
+def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
+ v.status('Converting digest to base32')
+ process = subprocess.run(_nix_command(v) + [
+ 'to-base32',
+ '--type',
+ 'sha256',
+ digest16],
+ stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ return Digest32(process.stdout.decode().strip())
+
+
+def fetch_with_nix_prefetch_url(
+ v: Verification,
+ url: str,
+ digest: Digest16) -> str:
+ v.status(f'Fetching {url}')
+ process = subprocess.run(
+ ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
+ v.result(process.returncode == 0)
+ prefetch_digest, path, empty = process.stdout.decode().split('\n')
+ assert empty == ''
+ v.check("Verifying nix-prefetch-url's digest",
+ to_Digest16(v, Digest32(prefetch_digest)) == digest)
+ v.status(f"Verifying digest of {path}")
+ file_digest = digest_file(path)
+ v.result(file_digest == digest)
+ return path # type: ignore # (for old mypy)
+
+
+def fetch_resources(
+ v: Verification,
+ pin: GitPin,
+ forwarded_url: str,
+ table: Dict[str, ChannelTableEntry]) -> None:
+ for resource in ['git-revision', 'nixexprs.tar.xz']:
+ fields = table[resource]
+ fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
+ fields.file = fetch_with_nix_prefetch_url(
+ v, fields.absolute_url, fields.digest)
+ v.status('Verifying git commit on main page matches git commit in table')
+ with open(table['git-revision'].file, encoding='utf-8') as rev_file:
+ v.result(rev_file.read(999) == pin.git_revision)
+
+
+def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
+ return os.path.join(
+ xdg.XDG_CACHE_HOME,
+ 'pinch/git-tarball',
+ f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
+
+
+def verify_git_ancestry(
+ v: Verification,
+ channel: TarrableSearchPath,
+ old_revision: str,
+ new_revision: str) -> None:
+ cachedir = git_cache.git_cachedir(channel.git_repo)
+ v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
+ process = subprocess.run(['git',
+ '-C',
+ cachedir,
+ 'merge-base',
+ '--is-ancestor',
+ old_revision,
+ new_revision])
+ v.result(process.returncode == 0)
+
+
+def compare_tarball_and_git(
+ v: Verification,
+ pin: GitPin,
+ channel_contents: str,
+ git_contents: str) -> None:
+ v.status('Comparing channel tarball with git checkout')
+ match, mismatch, errors = compare(os.path.join(
+ channel_contents, pin.release_name), git_contents)
+ v.ok()
+ v.check(f'{len(match)} files match', len(match) > 0)
+ v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
+ expected_errors = [
+ '.git-revision',
+ '.version-suffix',
+ 'nixpkgs',
+ 'programs.sqlite',
+ 'svn-revision']
+ permitted_errors = [
+ 'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo.nix',
+ ]
+ benign_expected_errors = []
+ benign_permitted_errors = []
+ for ee in expected_errors:
+ if ee in errors:
+ errors.remove(ee)
+ benign_expected_errors.append(ee)
+ for pe in permitted_errors:
+ if pe in errors:
+ errors.remove(pe)
+ benign_permitted_errors.append(ee)
+ v.check(
+ f'{len(errors)} unexpected incomparable files: {errors}',
+ len(errors) == 0)
+ v.check(
+ f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
+ len(benign_expected_errors) == len(expected_errors))
+ v.check(
+ f'({len(benign_permitted_errors)} of {len(permitted_errors)} permitted incomparable files)',
+ len(benign_permitted_errors) <= len(permitted_errors))
+
+
+def extract_tarball(
+ v: Verification,
+ table: Dict[str, ChannelTableEntry],
+ dest: str) -> None:
+ v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
+ shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)