]> git.scottworley.com Git - pinch/blame_incremental - pinch.py
Specify license
[pinch] / pinch.py
... / ...
CommitLineData
1# pinch: PIN CHannels - a replacement for `nix-channel --update`
2#
3# This program is free software: you can redistribute it and/or modify it
4# under the terms of the GNU General Public License as published by the
5# Free Software Foundation, version 3.
6
7
8import argparse
9import configparser
10import filecmp
11import functools
12import getpass
13import hashlib
14import operator
15import os
16import os.path
17import shlex
18import shutil
19import subprocess
20import sys
21import tarfile
22import tempfile
23import types
24import urllib.parse
25import urllib.request
26import xml.dom.minidom
27
28from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42)
43
44import git_cache
45
46# Use xdg module when it's less painful to have as a dependency
47
48
49class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59class VerificationError(Exception):
60 pass
61
62
63class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94Digest16 = NewType('Digest16', str)
95Digest32 = NewType('Digest32', str)
96
97
98class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106class AliasPin(NamedTuple):
107 pass
108
109
110class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 # pylint: disable=no-self-use
152 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
153 return AliasPin()
154
155
156class SymlinkSearchPath(NamedTuple):
157 path: str
158
159 # pylint: disable=no-self-use
160 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
161 return SymlinkPin()
162
163 def fetch(self, v: Verification, _: Pin) -> str:
164 return symlink_archive(v, self.path)
165
166
167class GitSearchPath(NamedTuple):
168 git_ref: str
169 git_repo: str
170
171 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
172 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
173 if old_pin is not None:
174 assert isinstance(old_pin, GitPin)
175 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
176 return GitPin(release_name=git_revision_name(v, self, new_revision),
177 git_revision=new_revision)
178
179 def fetch(self, v: Verification, pin: Pin) -> str:
180 assert isinstance(pin, GitPin)
181 git_cache.ensure_rev_available(
182 self.git_repo, self.git_ref, pin.git_revision)
183 return git_get_tarball(v, self, pin)
184
185
186class ChannelSearchPath(NamedTuple):
187 channel_url: str
188 git_ref: str
189 git_repo: str
190
191 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
192 if old_pin is not None:
193 assert isinstance(old_pin, ChannelPin)
194
195 channel_html, forwarded_url = fetch_channel(v, self)
196 table, new_gitpin = parse_channel(v, channel_html)
197 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
198 return old_pin
199 fetch_resources(v, new_gitpin, forwarded_url, table)
200 git_cache.ensure_rev_available(
201 self.git_repo, self.git_ref, new_gitpin.git_revision)
202 if old_pin is not None:
203 verify_git_ancestry(
204 v, self, old_pin.git_revision, new_gitpin.git_revision)
205 check_channel_contents(v, self, table, new_gitpin)
206 return ChannelPin(
207 release_name=new_gitpin.release_name,
208 tarball_url=table['nixexprs.tar.xz'].absolute_url,
209 tarball_sha256=table['nixexprs.tar.xz'].digest,
210 git_revision=new_gitpin.git_revision)
211
212 # pylint: disable=no-self-use
213 def fetch(self, v: Verification, pin: Pin) -> str:
214 assert isinstance(pin, ChannelPin)
215
216 return fetch_with_nix_prefetch_url(
217 v, pin.tarball_url, Digest16(pin.tarball_sha256))
218
219
220SearchPath = Union[AliasSearchPath,
221 SymlinkSearchPath,
222 GitSearchPath,
223 ChannelSearchPath]
224TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
225
226
227def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
228
229 def throw(error: OSError) -> None:
230 raise error
231
232 def join(x: str, y: str) -> str:
233 return y if x == '.' else os.path.join(x, y)
234
235 def recursive_files(d: str) -> Iterable[str]:
236 all_files: List[str] = []
237 for path, dirs, files in os.walk(d, onerror=throw):
238 rel = os.path.relpath(path, start=d)
239 all_files.extend(join(rel, f) for f in files)
240 for dir_or_link in dirs:
241 if os.path.islink(join(path, dir_or_link)):
242 all_files.append(join(rel, dir_or_link))
243 return all_files
244
245 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
246 return (f for f in files if not f.startswith('.git/'))
247
248 files = functools.reduce(
249 operator.or_, (set(
250 exclude_dot_git(
251 recursive_files(x))) for x in [a, b]))
252 return filecmp.cmpfiles(a, b, files, shallow=False)
253
254
255def fetch_channel(
256 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
257 v.status(f'Fetching channel from {channel.channel_url}')
258 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
259 channel_html = request.read().decode()
260 forwarded_url = request.geturl()
261 v.result(request.status == 200)
262 v.check('Got forwarded', channel.channel_url != forwarded_url)
263 return channel_html, forwarded_url
264
265
266def parse_channel(v: Verification, channel_html: str) \
267 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
268 v.status('Parsing channel description as XML')
269 d = xml.dom.minidom.parseString(channel_html)
270 v.ok()
271
272 v.status('Extracting release name:')
273 title_name = d.getElementsByTagName(
274 'title')[0].firstChild.nodeValue.split()[2]
275 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
276 v.status(title_name)
277 v.result(title_name == h1_name)
278
279 v.status('Extracting git commit:')
280 git_commit_node = d.getElementsByTagName('tt')[0]
281 git_revision = git_commit_node.firstChild.nodeValue
282 v.status(git_revision)
283 v.ok()
284 v.status('Verifying git commit label')
285 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
286
287 v.status('Parsing table')
288 table: Dict[str, ChannelTableEntry] = {}
289 for row in d.getElementsByTagName('tr')[1:]:
290 name = row.childNodes[0].firstChild.firstChild.nodeValue
291 url = row.childNodes[0].firstChild.getAttribute('href')
292 size = int(row.childNodes[1].firstChild.nodeValue)
293 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
294 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
295 v.ok()
296 return table, GitPin(release_name=title_name, git_revision=git_revision)
297
298
299def digest_string(s: bytes) -> Digest16:
300 return Digest16(hashlib.sha256(s).hexdigest())
301
302
303def digest_file(filename: str) -> Digest16:
304 hasher = hashlib.sha256()
305 with open(filename, 'rb') as f:
306 # pylint: disable=cell-var-from-loop
307 for block in iter(lambda: f.read(4096), b''):
308 hasher.update(block)
309 return Digest16(hasher.hexdigest())
310
311
312@functools.lru_cache
313def _experimental_flag_needed(v: Verification) -> bool:
314 v.status('Checking Nix version')
315 process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
316 v.result(process.returncode == 0)
317 return b'--experimental-features' in process.stdout
318
319
320def _nix_command(v: Verification) -> List[str]:
321 return ['nix', '--experimental-features',
322 'nix-command'] if _experimental_flag_needed(v) else ['nix']
323
324
325def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
326 v.status('Converting digest to base16')
327 process = subprocess.run(_nix_command(v) + [
328 'to-base16',
329 '--type',
330 'sha256',
331 digest32],
332 stdout=subprocess.PIPE)
333 v.result(process.returncode == 0)
334 return Digest16(process.stdout.decode().strip())
335
336
337def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
338 v.status('Converting digest to base32')
339 process = subprocess.run(_nix_command(v) + [
340 'to-base32',
341 '--type',
342 'sha256',
343 digest16],
344 stdout=subprocess.PIPE)
345 v.result(process.returncode == 0)
346 return Digest32(process.stdout.decode().strip())
347
348
349def fetch_with_nix_prefetch_url(
350 v: Verification,
351 url: str,
352 digest: Digest16) -> str:
353 v.status(f'Fetching {url}')
354 process = subprocess.run(
355 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
356 v.result(process.returncode == 0)
357 prefetch_digest, path, empty = process.stdout.decode().split('\n')
358 assert empty == ''
359 v.check("Verifying nix-prefetch-url's digest",
360 to_Digest16(v, Digest32(prefetch_digest)) == digest)
361 v.status(f"Verifying digest of {path}")
362 file_digest = digest_file(path)
363 v.result(file_digest == digest)
364 return path # type: ignore # (for old mypy)
365
366
367def fetch_resources(
368 v: Verification,
369 pin: GitPin,
370 forwarded_url: str,
371 table: Dict[str, ChannelTableEntry]) -> None:
372 for resource in ['git-revision', 'nixexprs.tar.xz']:
373 fields = table[resource]
374 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
375 fields.file = fetch_with_nix_prefetch_url(
376 v, fields.absolute_url, fields.digest)
377 v.status('Verifying git commit on main page matches git commit in table')
378 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
379 v.result(rev_file.read(999) == pin.git_revision)
380
381
382def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
383 return os.path.join(
384 xdg.XDG_CACHE_HOME,
385 'pinch/git-tarball',
386 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
387
388
389def verify_git_ancestry(
390 v: Verification,
391 channel: TarrableSearchPath,
392 old_revision: str,
393 new_revision: str) -> None:
394 cachedir = git_cache.git_cachedir(channel.git_repo)
395 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
396 process = subprocess.run(['git',
397 '-C',
398 cachedir,
399 'merge-base',
400 '--is-ancestor',
401 old_revision,
402 new_revision])
403 v.result(process.returncode == 0)
404
405
406def compare_tarball_and_git(
407 v: Verification,
408 pin: GitPin,
409 channel_contents: str,
410 git_contents: str) -> None:
411 v.status('Comparing channel tarball with git checkout')
412 match, mismatch, errors = compare(os.path.join(
413 channel_contents, pin.release_name), git_contents)
414 v.ok()
415 v.check(f'{len(match)} files match', len(match) > 0)
416 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
417 expected_errors = [
418 '.git-revision',
419 '.version-suffix',
420 'nixpkgs',
421 'programs.sqlite',
422 'svn-revision']
423 benign_errors = []
424 for ee in expected_errors:
425 if ee in errors:
426 errors.remove(ee)
427 benign_errors.append(ee)
428 v.check(f'{len(errors)} unexpected incomparable files', len(errors) == 0)
429 v.check(
430 f'({len(benign_errors)} of {len(expected_errors)} expected incomparable files)',
431 len(benign_errors) == len(expected_errors))
432
433
434def extract_tarball(
435 v: Verification,
436 table: Dict[str, ChannelTableEntry],
437 dest: str) -> None:
438 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
439 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
440 v.ok()
441
442
443def git_checkout(
444 v: Verification,
445 channel: TarrableSearchPath,
446 pin: GitPin,
447 dest: str) -> None:
448 v.status('Checking out corresponding git revision')
449 with subprocess.Popen(
450 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
451 stdout=subprocess.PIPE) as git:
452 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
453 if git.stdout:
454 git.stdout.close()
455 tar.wait()
456 git.wait()
457 v.result(git.returncode == 0 and tar.returncode == 0)
458
459
460def git_get_tarball(
461 v: Verification,
462 channel: TarrableSearchPath,
463 pin: GitPin) -> str:
464 cache_file = tarball_cache_file(channel, pin)
465 if os.path.exists(cache_file):
466 with open(cache_file, encoding='utf-8') as f:
467 cached_tarball = f.read(9999)
468 if os.path.exists(cached_tarball):
469 return cached_tarball
470
471 with tempfile.TemporaryDirectory() as output_dir:
472 output_filename = os.path.join(
473 output_dir, pin.release_name + '.tar.xz')
474 with open(output_filename, 'w', encoding='utf-8') as output_file:
475 v.status(f'Generating tarball for git revision {pin.git_revision}')
476 with subprocess.Popen(
477 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
478 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
479 stdout=subprocess.PIPE) as git:
480 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
481 xz.wait()
482 git.wait()
483 v.result(git.returncode == 0 and xz.returncode == 0)
484
485 store_tarball = copy_to_nix_store(v, output_filename)
486
487 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
488 with open(cache_file, 'w', encoding='utf-8') as f:
489 f.write(store_tarball)
490 return store_tarball # type: ignore # (for old mypy)
491
492
493def check_channel_metadata(
494 v: Verification,
495 pin: GitPin,
496 channel_contents: str) -> None:
497 v.status('Verifying git commit in channel tarball')
498 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
499 encoding='utf-8') as f:
500 v.result(f.read(999) == pin.git_revision)
501
502 v.status(
503 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
504 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
505 encoding='utf-8') as f:
506 version_suffix = f.read(999)
507 v.status(version_suffix)
508 v.result(pin.release_name.endswith(version_suffix))
509
510
511def check_channel_contents(
512 v: Verification,
513 channel: TarrableSearchPath,
514 table: Dict[str, ChannelTableEntry],
515 pin: GitPin) -> None:
516 with tempfile.TemporaryDirectory() as channel_contents, \
517 tempfile.TemporaryDirectory() as git_contents:
518
519 extract_tarball(v, table, channel_contents)
520 check_channel_metadata(v, pin, channel_contents)
521
522 git_checkout(v, channel, pin, git_contents)
523
524 compare_tarball_and_git(v, pin, channel_contents, git_contents)
525
526 v.status('Removing temporary directories')
527 v.ok()
528
529
530def git_revision_name(
531 v: Verification,
532 channel: TarrableSearchPath,
533 git_revision: str) -> str:
534 v.status('Getting commit date')
535 process = subprocess.run(['git',
536 '-C',
537 git_cache.git_cachedir(channel.git_repo),
538 'log',
539 '-n1',
540 '--format=%ct-%h',
541 '--abbrev=11',
542 '--no-show-signature',
543 git_revision],
544 stdout=subprocess.PIPE)
545 v.result(process.returncode == 0 and process.stdout != b'')
546 return f'{os.path.basename(channel.git_repo)}-{process.stdout.decode().strip()}'
547
548
549K = TypeVar('K')
550V = TypeVar('V')
551
552
553def partition_dict(pred: Callable[[K, V], bool],
554 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
555 selected: Dict[K, V] = {}
556 remaining: Dict[K, V] = {}
557 for k, v in d.items():
558 if pred(k, v):
559 selected[k] = v
560 else:
561 remaining[k] = v
562 return selected, remaining
563
564
565def filter_dict(d: Dict[K, V], fields: Set[K]
566 ) -> Tuple[Dict[K, V], Dict[K, V]]:
567 return partition_dict(lambda k, v: k in fields, d)
568
569
570def read_config_section(
571 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
572 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
573 'alias': (AliasSearchPath, AliasPin),
574 'channel': (ChannelSearchPath, ChannelPin),
575 'git': (GitSearchPath, GitPin),
576 'symlink': (SymlinkSearchPath, SymlinkPin),
577 }
578 SP, P = mapping[conf['type']]
579 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
580 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
581 # Error suppression works around https://github.com/python/mypy/issues/9007
582 pin_present = pin_fields or P._fields == ()
583 pin = P(**pin_fields) if pin_present else None # type: ignore
584 return SP(**remaining_fields), pin
585
586
587def read_pinned_config_section(
588 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
589 sp, pin = read_config_section(conf)
590 if pin is None:
591 raise Exception(
592 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
593 return sp, pin
594
595
596def read_config(filename: str) -> configparser.ConfigParser:
597 config = configparser.ConfigParser()
598 with open(filename, encoding='utf-8') as f:
599 config.read_file(f, filename)
600 return config
601
602
603def read_config_files(
604 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
605 merged_config: Dict[str, configparser.SectionProxy] = {}
606 for file in filenames:
607 config = read_config(file)
608 for section in config.sections():
609 if section in merged_config:
610 raise Exception('Duplicate channel "{section}"')
611 merged_config[section] = config[section]
612 return merged_config
613
614
615def pinCommand(args: argparse.Namespace) -> None:
616 v = Verification()
617 config = read_config(args.channels_file)
618 for section in config.sections():
619 if args.channels and section not in args.channels:
620 continue
621
622 sp, old_pin = read_config_section(config[section])
623
624 config[section].update(sp.pin(v, old_pin)._asdict())
625
626 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
627 config.write(configfile)
628
629
630def updateCommand(args: argparse.Namespace) -> None:
631 v = Verification()
632 exprs: Dict[str, str] = {}
633 profile_manifest = os.path.join(args.profile, "manifest.nix")
634 search_paths: List[str] = [
635 "-I", "pinch_profile=" + args.profile,
636 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
637 ] if os.path.exists(profile_manifest) else []
638 config = {
639 section: read_pinned_config_section(section, conf) for section,
640 conf in read_config_files(
641 args.channels_file).items()}
642 alias, nonalias = partition_dict(
643 lambda k, v: isinstance(v[0], AliasSearchPath), config)
644
645 for section, (sp, pin) in sorted(nonalias.items()):
646 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
647 assert not isinstance(pin, AliasPin) # partition_dict()
648 tarball = sp.fetch(v, pin)
649 search_paths.extend(
650 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
651 exprs[section] = (
652 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
653 f'src = builtins.storePath "{tarball}"; }}')
654
655 for section, (sp, pin) in alias.items():
656 assert isinstance(sp, AliasSearchPath) # For mypy
657 exprs[section] = exprs[sp.alias_of]
658
659 command = [
660 'nix-env',
661 '--profile',
662 args.profile,
663 '--show-trace',
664 '--file',
665 '<nix/unpack-channel.nix>',
666 '--install',
667 '--remove-all',
668 ] + search_paths + ['--from-expression'] + [
669 exprs[name] % name for name in sorted(exprs.keys())]
670 if args.dry_run:
671 print(' '.join(map(shlex.quote, command)))
672 else:
673 v.status('Installing channels with nix-env')
674 process = subprocess.run(command)
675 v.result(process.returncode == 0)
676
677
678def main() -> None:
679 parser = argparse.ArgumentParser(prog='pinch')
680 subparsers = parser.add_subparsers(dest='mode', required=True)
681 parser_pin = subparsers.add_parser('pin')
682 parser_pin.add_argument('channels_file', type=str)
683 parser_pin.add_argument('channels', type=str, nargs='*')
684 parser_pin.set_defaults(func=pinCommand)
685 parser_update = subparsers.add_parser('update')
686 parser_update.add_argument('--dry-run', action='store_true')
687 parser_update.add_argument('--profile', default=(
688 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
689 parser_update.add_argument('channels_file', type=str, nargs='+')
690 parser_update.set_defaults(func=updateCommand)
691 args = parser.parse_args()
692 args.func(args)
693
694
695if __name__ == '__main__':
696 main()