import argparse import configparser import filecmp import functools import getpass import hashlib import operator import os import os.path import shlex import shutil import subprocess import sys import tempfile import types import urllib.parse import urllib.request import xml.dom.minidom from typing import ( Dict, Iterable, List, NewType, Tuple, ) import xdg Digest16 = NewType('Digest16', str) Digest32 = NewType('Digest32', str) class ChannelTableEntry(types.SimpleNamespace): absolute_url: str digest: Digest16 file: str size: int url: str class Channel(types.SimpleNamespace): alias_of: str channel_html: bytes channel_url: str forwarded_url: str git_ref: str git_repo: str git_revision: str old_git_revision: str release_name: str table: Dict[str, ChannelTableEntry] class VerificationError(Exception): pass class Verification: def __init__(self) -> None: self.line_length = 0 def status(self, s: str) -> None: print(s, end=' ', file=sys.stderr, flush=True) self.line_length += 1 + len(s) # Unicode?? @staticmethod def _color(s: str, c: int) -> str: return '\033[%2dm%s\033[00m' % (c, s) def result(self, r: bool) -> None: message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r] length = len(message) cols = shutil.get_terminal_size().columns pad = (cols - (self.line_length + length)) % cols print(' ' * pad + self._color(message, color), file=sys.stderr) self.line_length = 0 if not r: raise VerificationError() def check(self, s: str, r: bool) -> None: self.status(s) self.result(r) def ok(self) -> None: self.result(True) def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]: def throw(error: OSError) -> None: raise error def join(x: str, y: str) -> str: return y if x == '.' else os.path.join(x, y) def recursive_files(d: str) -> Iterable[str]: all_files: List[str] = [] for path, dirs, files in os.walk(d, onerror=throw): rel = os.path.relpath(path, start=d) all_files.extend(join(rel, f) for f in files) for dir_or_link in dirs: if os.path.islink(join(path, dir_or_link)): all_files.append(join(rel, dir_or_link)) return all_files def exclude_dot_git(files: Iterable[str]) -> Iterable[str]: return (f for f in files if not f.startswith('.git/')) files = functools.reduce( operator.or_, (set( exclude_dot_git( recursive_files(x))) for x in [a, b])) return filecmp.cmpfiles(a, b, files, shallow=False) def fetch(v: Verification, channel: Channel) -> None: v.status('Fetching channel') request = urllib.request.urlopen(channel.channel_url, timeout=10) channel.channel_html = request.read() channel.forwarded_url = request.geturl() v.result(request.status == 200) v.check('Got forwarded', channel.channel_url != channel.forwarded_url) def parse_channel(v: Verification, channel: Channel) -> None: v.status('Parsing channel description as XML') d = xml.dom.minidom.parseString(channel.channel_html) v.ok() v.status('Extracting release name:') title_name = d.getElementsByTagName( 'title')[0].firstChild.nodeValue.split()[2] h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2] v.status(title_name) v.result(title_name == h1_name) channel.release_name = title_name v.status('Extracting git commit:') git_commit_node = d.getElementsByTagName('tt')[0] channel.git_revision = git_commit_node.firstChild.nodeValue v.status(channel.git_revision) v.ok() v.status('Verifying git commit label') v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ') v.status('Parsing table') channel.table = {} for row in d.getElementsByTagName('tr')[1:]: name = row.childNodes[0].firstChild.firstChild.nodeValue url = row.childNodes[0].firstChild.getAttribute('href') size = int(row.childNodes[1].firstChild.nodeValue) digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue) channel.table[name] = ChannelTableEntry( url=url, digest=digest, size=size) v.ok() def digest_string(s: bytes) -> Digest16: return Digest16(hashlib.sha256(s).hexdigest()) def digest_file(filename: str) -> Digest16: hasher = hashlib.sha256() with open(filename, 'rb') as f: # pylint: disable=cell-var-from-loop for block in iter(lambda: f.read(4096), b''): hasher.update(block) return Digest16(hasher.hexdigest()) def to_Digest16(v: Verification, digest32: Digest32) -> Digest16: v.status('Converting digest to base16') process = subprocess.run( ['nix', 'to-base16', '--type', 'sha256', digest32], capture_output=True) v.result(process.returncode == 0) return Digest16(process.stdout.decode().strip()) def to_Digest32(v: Verification, digest16: Digest16) -> Digest32: v.status('Converting digest to base32') process = subprocess.run( ['nix', 'to-base32', '--type', 'sha256', digest16], capture_output=True) v.result(process.returncode == 0) return Digest32(process.stdout.decode().strip()) def fetch_with_nix_prefetch_url( v: Verification, url: str, digest: Digest16) -> str: v.status('Fetching %s' % url) process = subprocess.run( ['nix-prefetch-url', '--print-path', url, digest], capture_output=True) v.result(process.returncode == 0) prefetch_digest, path, empty = process.stdout.decode().split('\n') assert empty == '' v.check("Verifying nix-prefetch-url's digest", to_Digest16(v, Digest32(prefetch_digest)) == digest) v.status("Verifying file digest") file_digest = digest_file(path) v.result(file_digest == digest) return path def fetch_resources(v: Verification, channel: Channel) -> None: for resource in ['git-revision', 'nixexprs.tar.xz']: fields = channel.table[resource] fields.absolute_url = urllib.parse.urljoin( channel.forwarded_url, fields.url) fields.file = fetch_with_nix_prefetch_url( v, fields.absolute_url, fields.digest) v.status('Verifying git commit on main page matches git commit in table') v.result( open( channel.table['git-revision'].file).read(999) == channel.git_revision) def git_cachedir(git_repo: str) -> str: return os.path.join( xdg.XDG_CACHE_HOME, 'pinch/git', digest_string(git_repo.encode())) def tarball_cache_file(channel: Channel) -> str: return os.path.join( xdg.XDG_CACHE_HOME, 'pinch/git-tarball', '%s-%s-%s' % (digest_string(channel.git_repo.encode()), channel.git_revision, channel.release_name)) def verify_git_ancestry(v: Verification, channel: Channel) -> None: cachedir = git_cachedir(channel.git_repo) v.status('Verifying rev is an ancestor of ref') process = subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor', channel.git_revision, channel.git_ref]) v.result(process.returncode == 0) if hasattr(channel, 'old_git_revision'): v.status( 'Verifying rev is an ancestor of previous rev %s' % channel.old_git_revision) process = subprocess.run(['git', '-C', cachedir, 'merge-base', '--is-ancestor', channel.old_git_revision, channel.git_revision]) v.result(process.returncode == 0) def git_fetch(v: Verification, channel: Channel) -> None: # It would be nice if we could share the nix git cache, but as of the time # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready # yet), and trying to straddle them both is too far into nix implementation # details for my comfort. So we re-implement here half of nix.fetchGit. # :( cachedir = git_cachedir(channel.git_repo) if not os.path.exists(cachedir): v.status("Initializing git repo") process = subprocess.run( ['git', 'init', '--bare', cachedir]) v.result(process.returncode == 0) v.status('Fetching ref "%s" from %s' % (channel.git_ref, channel.git_repo)) # We don't use --force here because we want to abort and freak out if forced # updates are happening. process = subprocess.run(['git', '-C', cachedir, 'fetch', channel.git_repo, '%s:%s' % (channel.git_ref, channel.git_ref)]) v.result(process.returncode == 0) if hasattr(channel, 'git_revision'): v.status('Verifying that fetch retrieved this rev') process = subprocess.run( ['git', '-C', cachedir, 'cat-file', '-e', channel.git_revision]) v.result(process.returncode == 0) else: channel.git_revision = open( os.path.join( cachedir, 'refs', 'heads', channel.git_ref)).read(999).strip() verify_git_ancestry(v, channel) def ensure_git_rev_available(v: Verification, channel: Channel) -> None: cachedir = git_cachedir(channel.git_repo) if os.path.exists(cachedir): v.status('Checking if we already have this rev:') process = subprocess.run( ['git', '-C', cachedir, 'cat-file', '-e', channel.git_revision]) if process.returncode == 0: v.status('yes') if process.returncode == 1: v.status('no') v.result(process.returncode == 0 or process.returncode == 1) if process.returncode == 0: verify_git_ancestry(v, channel) return git_fetch(v, channel) def compare_tarball_and_git( v: Verification, channel: Channel, channel_contents: str, git_contents: str) -> None: v.status('Comparing channel tarball with git checkout') match, mismatch, errors = compare(os.path.join( channel_contents, channel.release_name), git_contents) v.ok() v.check('%d files match' % len(match), len(match) > 0) v.check('%d files differ' % len(mismatch), len(mismatch) == 0) expected_errors = [ '.git-revision', '.version-suffix', 'nixpkgs', 'programs.sqlite', 'svn-revision'] benign_errors = [] for ee in expected_errors: if ee in errors: errors.remove(ee) benign_errors.append(ee) v.check( '%d unexpected incomparable files' % len(errors), len(errors) == 0) v.check( '(%d of %d expected incomparable files)' % (len(benign_errors), len(expected_errors)), len(benign_errors) == len(expected_errors)) def extract_tarball(v: Verification, channel: Channel, dest: str) -> None: v.status('Extracting tarball %s' % channel.table['nixexprs.tar.xz'].file) shutil.unpack_archive( channel.table['nixexprs.tar.xz'].file, dest) v.ok() def git_checkout(v: Verification, channel: Channel, dest: str) -> None: v.status('Checking out corresponding git revision') git = subprocess.Popen(['git', '-C', git_cachedir(channel.git_repo), 'archive', channel.git_revision], stdout=subprocess.PIPE) tar = subprocess.Popen( ['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) if git.stdout: git.stdout.close() tar.wait() git.wait() v.result(git.returncode == 0 and tar.returncode == 0) def git_get_tarball(v: Verification, channel: Channel) -> str: cache_file = tarball_cache_file(channel) if os.path.exists(cache_file): cached_tarball = open(cache_file).read(9999) if os.path.exists(cached_tarball): return cached_tarball with tempfile.TemporaryDirectory() as output_dir: output_filename = os.path.join( output_dir, channel.release_name + '.tar.xz') with open(output_filename, 'w') as output_file: v.status( 'Generating tarball for git revision %s' % channel.git_revision) git = subprocess.Popen(['git', '-C', git_cachedir(channel.git_repo), 'archive', '--prefix=%s/' % channel.release_name, channel.git_revision], stdout=subprocess.PIPE) xz = subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) xz.wait() git.wait() v.result(git.returncode == 0 and xz.returncode == 0) v.status('Putting tarball in Nix store') process = subprocess.run( ['nix-store', '--add', output_filename], capture_output=True) v.result(process.returncode == 0) store_tarball = process.stdout.decode().strip() os.makedirs(os.path.dirname(cache_file), exist_ok=True) open(cache_file, 'w').write(store_tarball) return store_tarball def check_channel_metadata( v: Verification, channel: Channel, channel_contents: str) -> None: v.status('Verifying git commit in channel tarball') v.result( open( os.path.join( channel_contents, channel.release_name, '.git-revision')).read(999) == channel.git_revision) v.status( 'Verifying version-suffix is a suffix of release name %s:' % channel.release_name) version_suffix = open( os.path.join( channel_contents, channel.release_name, '.version-suffix')).read(999) v.status(version_suffix) v.result(channel.release_name.endswith(version_suffix)) def check_channel_contents(v: Verification, channel: Channel) -> None: with tempfile.TemporaryDirectory() as channel_contents, \ tempfile.TemporaryDirectory() as git_contents: extract_tarball(v, channel, channel_contents) check_channel_metadata(v, channel, channel_contents) git_checkout(v, channel, git_contents) compare_tarball_and_git(v, channel, channel_contents, git_contents) v.status('Removing temporary directories') v.ok() def pin_channel(v: Verification, channel: Channel) -> None: fetch(v, channel) parse_channel(v, channel) fetch_resources(v, channel) ensure_git_rev_available(v, channel) check_channel_contents(v, channel) def git_revision_name(v: Verification, channel: Channel) -> str: v.status('Getting commit date') process = subprocess.run(['git', '-C', git_cachedir(channel.git_repo), 'lo', '-n1', '--format=%ct-%h', '--abbrev=11', channel.git_revision], capture_output=True) v.result(process.returncode == 0 and process.stdout != b'') return '%s-%s' % (os.path.basename(channel.git_repo), process.stdout.decode().strip()) def read_config(filename: str) -> configparser.ConfigParser: config = configparser.ConfigParser() config.read_file(open(filename), filename) return config def pin(args: argparse.Namespace) -> None: v = Verification() config = read_config(args.channels_file) for section in config.sections(): if args.channels and section not in args.channels: continue channel = Channel(**dict(config[section].items())) if hasattr(channel, 'alias_of'): assert not hasattr(channel, 'git_repo') continue if hasattr(channel, 'git_revision'): channel.old_git_revision = channel.git_revision del channel.git_revision if 'channel_url' in config[section]: pin_channel(v, channel) config[section]['release_name'] = channel.release_name config[section]['tarball_url'] = channel.table['nixexprs.tar.xz'].absolute_url config[section]['tarball_sha256'] = channel.table['nixexprs.tar.xz'].digest else: git_fetch(v, channel) config[section]['release_name'] = git_revision_name(v, channel) config[section]['git_revision'] = channel.git_revision with open(args.channels_file, 'w') as configfile: config.write(configfile) def fetch_channel( v: Verification, section: str, conf: configparser.SectionProxy) -> str: if 'git_repo' not in conf or 'release_name' not in conf: raise Exception( 'Cannot update unpinned channel "%s" (Run "pin" before "update")' % section) if 'channel_url' in conf: return fetch_with_nix_prefetch_url( v, conf['tarball_url'], Digest16( conf['tarball_sha256'])) channel = Channel(**dict(conf.items())) ensure_git_rev_available(v, channel) return git_get_tarball(v, channel) def update(args: argparse.Namespace) -> None: v = Verification() config = configparser.ConfigParser() exprs: Dict[str, str] = {} configs = [read_config(filename) for filename in args.channels_file] for config in configs: for section in config.sections(): if 'alias_of' in config[section]: assert 'git_repo' not in config[section] continue tarball = fetch_channel(v, section, config[section]) if section in exprs: raise Exception('Duplicate channel "%s"' % section) exprs[section] = ( 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' % (config[section]['release_name'], tarball)) for config in configs: for section in config.sections(): if 'alias_of' in config[section]: if section in exprs: raise Exception('Duplicate channel "%s"' % section) exprs[section] = exprs[str(config[section]['alias_of'])] command = [ 'nix-env', '--profile', '/nix/var/nix/profiles/per-user/%s/channels' % getpass.getuser(), '--show-trace', '--file', '', '--install', '--from-expression'] + [exprs[name] % name for name in sorted(exprs.keys())] if args.dry_run: print(' '.join(map(shlex.quote, command))) else: v.status('Installing channels with nix-env') process = subprocess.run(command) v.result(process.returncode == 0) def main() -> None: parser = argparse.ArgumentParser(prog='pinch') subparsers = parser.add_subparsers(dest='mode', required=True) parser_pin = subparsers.add_parser('pin') parser_pin.add_argument('channels_file', type=str) parser_pin.add_argument('channels', type=str, nargs='*') parser_pin.set_defaults(func=pin) parser_update = subparsers.add_parser('update') parser_update.add_argument('--dry-run', action='store_true') parser_update.add_argument('channels_file', type=str, nargs='+') parser_update.set_defaults(func=update) args = parser.parse_args() args.func(args) main()