]> git.scottworley.com Git - pinch/blob - pinch.py
ed33c928ae6a466c5e27a70ccf2a52d592bc8bc8
[pinch] / pinch.py
1 import argparse
2 import configparser
3 import filecmp
4 import functools
5 import getpass
6 import hashlib
7 import operator
8 import os
9 import os.path
10 import shlex
11 import shutil
12 import subprocess
13 import sys
14 import tarfile
15 import tempfile
16 import types
17 import urllib.parse
18 import urllib.request
19 import xml.dom.minidom
20
21 from typing import (
22 Callable,
23 Dict,
24 Iterable,
25 List,
26 Mapping,
27 NamedTuple,
28 NewType,
29 Optional,
30 Set,
31 Tuple,
32 Type,
33 TypeVar,
34 Union,
35 )
36
37 # Use xdg module when it's less painful to have as a dependency
38
39
40 class XDG(NamedTuple):
41 XDG_CACHE_HOME: str
42
43
44 xdg = XDG(
45 XDG_CACHE_HOME=os.getenv(
46 'XDG_CACHE_HOME',
47 os.path.expanduser('~/.cache')))
48
49
50 class VerificationError(Exception):
51 pass
52
53
54 class Verification:
55
56 def __init__(self) -> None:
57 self.line_length = 0
58
59 def status(self, s: str) -> None:
60 print(s, end=' ', file=sys.stderr, flush=True)
61 self.line_length += 1 + len(s) # Unicode??
62
63 @staticmethod
64 def _color(s: str, c: int) -> str:
65 return '\033[%2dm%s\033[00m' % (c, s)
66
67 def result(self, r: bool) -> None:
68 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
69 length = len(message)
70 cols = shutil.get_terminal_size().columns or 80
71 pad = (cols - (self.line_length + length)) % cols
72 print(' ' * pad + self._color(message, color), file=sys.stderr)
73 self.line_length = 0
74 if not r:
75 raise VerificationError()
76
77 def check(self, s: str, r: bool) -> None:
78 self.status(s)
79 self.result(r)
80
81 def ok(self) -> None:
82 self.result(True)
83
84
85 Digest16 = NewType('Digest16', str)
86 Digest32 = NewType('Digest32', str)
87
88
89 class ChannelTableEntry(types.SimpleNamespace):
90 absolute_url: str
91 digest: Digest16
92 file: str
93 size: int
94 url: str
95
96
97 class AliasPin(NamedTuple):
98 pass
99
100
101 class SymlinkPin(NamedTuple):
102 @property
103 def release_name(self) -> str:
104 return 'link'
105
106
107 class GitPin(NamedTuple):
108 git_revision: str
109 release_name: str
110
111
112 class ChannelPin(NamedTuple):
113 git_revision: str
114 release_name: str
115 tarball_url: str
116 tarball_sha256: str
117
118
119 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
120
121
122 def copy_to_nix_store(v: Verification, filename: str) -> str:
123 v.status('Putting tarball in Nix store')
124 process = subprocess.run(
125 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
126 v.result(process.returncode == 0)
127 return process.stdout.decode().strip() # type: ignore # (for old mypy)
128
129
130 def symlink_archive(v: Verification, path: str) -> str:
131 with tempfile.TemporaryDirectory() as td:
132 archive_filename = os.path.join(td, 'link.tar.gz')
133 os.symlink(path, os.path.join(td, 'link'))
134 with tarfile.open(archive_filename, mode='x:gz') as t:
135 t.add(os.path.join(td, 'link'), arcname='link')
136 return copy_to_nix_store(v, archive_filename)
137
138
139 class AliasSearchPath(NamedTuple):
140 alias_of: str
141
142 # pylint: disable=no-self-use
143 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
144 return AliasPin()
145
146
147 class SymlinkSearchPath(NamedTuple):
148 path: str
149
150 # pylint: disable=no-self-use
151 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
152 return SymlinkPin()
153
154 def fetch(self, v: Verification, _: Pin) -> str:
155 return symlink_archive(v, self.path)
156
157
158 class GitSearchPath(NamedTuple):
159 git_ref: str
160 git_repo: str
161
162 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
163 if old_pin is not None:
164 assert isinstance(old_pin, GitPin)
165 old_revision = old_pin.git_revision if old_pin is not None else None
166
167 new_revision = git_fetch(v, self, None, old_revision)
168 return GitPin(release_name=git_revision_name(v, self, new_revision),
169 git_revision=new_revision)
170
171 def fetch(self, v: Verification, pin: Pin) -> str:
172 assert isinstance(pin, GitPin)
173 ensure_git_rev_available(v, self, pin, None)
174 return git_get_tarball(v, self, pin)
175
176
177 class ChannelSearchPath(NamedTuple):
178 channel_url: str
179 git_ref: str
180 git_repo: str
181
182 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
183 if old_pin is not None:
184 assert isinstance(old_pin, ChannelPin)
185 old_revision = old_pin.git_revision if old_pin is not None else None
186
187 channel_html, forwarded_url = fetch_channel(v, self)
188 table, new_gitpin = parse_channel(v, channel_html)
189 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
190 return old_pin
191 fetch_resources(v, new_gitpin, forwarded_url, table)
192 ensure_git_rev_available(v, self, new_gitpin, old_revision)
193 check_channel_contents(v, self, table, new_gitpin)
194 return ChannelPin(
195 release_name=new_gitpin.release_name,
196 tarball_url=table['nixexprs.tar.xz'].absolute_url,
197 tarball_sha256=table['nixexprs.tar.xz'].digest,
198 git_revision=new_gitpin.git_revision)
199
200 # pylint: disable=no-self-use
201 def fetch(self, v: Verification, pin: Pin) -> str:
202 assert isinstance(pin, ChannelPin)
203
204 return fetch_with_nix_prefetch_url(
205 v, pin.tarball_url, Digest16(pin.tarball_sha256))
206
207
208 SearchPath = Union[AliasSearchPath,
209 SymlinkSearchPath,
210 GitSearchPath,
211 ChannelSearchPath]
212 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
213
214
215 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
216
217 def throw(error: OSError) -> None:
218 raise error
219
220 def join(x: str, y: str) -> str:
221 return y if x == '.' else os.path.join(x, y)
222
223 def recursive_files(d: str) -> Iterable[str]:
224 all_files: List[str] = []
225 for path, dirs, files in os.walk(d, onerror=throw):
226 rel = os.path.relpath(path, start=d)
227 all_files.extend(join(rel, f) for f in files)
228 for dir_or_link in dirs:
229 if os.path.islink(join(path, dir_or_link)):
230 all_files.append(join(rel, dir_or_link))
231 return all_files
232
233 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
234 return (f for f in files if not f.startswith('.git/'))
235
236 files = functools.reduce(
237 operator.or_, (set(
238 exclude_dot_git(
239 recursive_files(x))) for x in [a, b]))
240 return filecmp.cmpfiles(a, b, files, shallow=False)
241
242
243 def fetch_channel(
244 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
245 v.status('Fetching channel')
246 request = urllib.request.urlopen(channel.channel_url, timeout=10)
247 channel_html = request.read().decode()
248 forwarded_url = request.geturl()
249 v.result(request.status == 200) # type: ignore # (for old mypy)
250 v.check('Got forwarded', channel.channel_url != forwarded_url)
251 return channel_html, forwarded_url
252
253
254 def parse_channel(v: Verification, channel_html: str) \
255 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
256 v.status('Parsing channel description as XML')
257 d = xml.dom.minidom.parseString(channel_html)
258 v.ok()
259
260 v.status('Extracting release name:')
261 title_name = d.getElementsByTagName(
262 'title')[0].firstChild.nodeValue.split()[2]
263 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
264 v.status(title_name)
265 v.result(title_name == h1_name)
266
267 v.status('Extracting git commit:')
268 git_commit_node = d.getElementsByTagName('tt')[0]
269 git_revision = git_commit_node.firstChild.nodeValue
270 v.status(git_revision)
271 v.ok()
272 v.status('Verifying git commit label')
273 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
274
275 v.status('Parsing table')
276 table: Dict[str, ChannelTableEntry] = {}
277 for row in d.getElementsByTagName('tr')[1:]:
278 name = row.childNodes[0].firstChild.firstChild.nodeValue
279 url = row.childNodes[0].firstChild.getAttribute('href')
280 size = int(row.childNodes[1].firstChild.nodeValue)
281 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
282 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
283 v.ok()
284 return table, GitPin(release_name=title_name, git_revision=git_revision)
285
286
287 def digest_string(s: bytes) -> Digest16:
288 return Digest16(hashlib.sha256(s).hexdigest())
289
290
291 def digest_file(filename: str) -> Digest16:
292 hasher = hashlib.sha256()
293 with open(filename, 'rb') as f:
294 # pylint: disable=cell-var-from-loop
295 for block in iter(lambda: f.read(4096), b''):
296 hasher.update(block)
297 return Digest16(hasher.hexdigest())
298
299
300 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
301 v.status('Converting digest to base16')
302 process = subprocess.run(
303 ['nix', 'to-base16', '--type', 'sha256', digest32], stdout=subprocess.PIPE)
304 v.result(process.returncode == 0)
305 return Digest16(process.stdout.decode().strip())
306
307
308 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
309 v.status('Converting digest to base32')
310 process = subprocess.run(
311 ['nix', 'to-base32', '--type', 'sha256', digest16], stdout=subprocess.PIPE)
312 v.result(process.returncode == 0)
313 return Digest32(process.stdout.decode().strip())
314
315
316 def fetch_with_nix_prefetch_url(
317 v: Verification,
318 url: str,
319 digest: Digest16) -> str:
320 v.status('Fetching %s' % url)
321 process = subprocess.run(
322 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
323 v.result(process.returncode == 0)
324 prefetch_digest, path, empty = process.stdout.decode().split('\n')
325 assert empty == ''
326 v.check("Verifying nix-prefetch-url's digest",
327 to_Digest16(v, Digest32(prefetch_digest)) == digest)
328 v.status("Verifying file digest")
329 file_digest = digest_file(path)
330 v.result(file_digest == digest)
331 return path # type: ignore # (for old mypy)
332
333
334 def fetch_resources(
335 v: Verification,
336 pin: GitPin,
337 forwarded_url: str,
338 table: Dict[str, ChannelTableEntry]) -> None:
339 for resource in ['git-revision', 'nixexprs.tar.xz']:
340 fields = table[resource]
341 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
342 fields.file = fetch_with_nix_prefetch_url(
343 v, fields.absolute_url, fields.digest)
344 v.status('Verifying git commit on main page matches git commit in table')
345 v.result(open(table['git-revision'].file).read(999) == pin.git_revision)
346
347
348 def git_cachedir(git_repo: str) -> str:
349 return os.path.join(
350 xdg.XDG_CACHE_HOME,
351 'pinch/git',
352 digest_string(git_repo.encode()))
353
354
355 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
356 return os.path.join(
357 xdg.XDG_CACHE_HOME,
358 'pinch/git-tarball',
359 '%s-%s-%s' %
360 (digest_string(channel.git_repo.encode()),
361 pin.git_revision,
362 pin.release_name))
363
364
365 def verify_git_ancestry(
366 v: Verification,
367 channel: TarrableSearchPath,
368 new_revision: str,
369 old_revision: Optional[str]) -> None:
370 cachedir = git_cachedir(channel.git_repo)
371 v.status('Verifying rev is an ancestor of ref')
372 process = subprocess.run(['git',
373 '-C',
374 cachedir,
375 'merge-base',
376 '--is-ancestor',
377 new_revision,
378 channel.git_ref])
379 v.result(process.returncode == 0)
380
381 if old_revision is not None:
382 v.status(
383 'Verifying rev is an ancestor of previous rev %s' %
384 old_revision)
385 process = subprocess.run(['git',
386 '-C',
387 cachedir,
388 'merge-base',
389 '--is-ancestor',
390 old_revision,
391 new_revision])
392 v.result(process.returncode == 0)
393
394
395 def git_fetch(
396 v: Verification,
397 channel: TarrableSearchPath,
398 desired_revision: Optional[str],
399 old_revision: Optional[str]) -> str:
400 # It would be nice if we could share the nix git cache, but as of the time
401 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
402 # yet), and trying to straddle them both is too far into nix implementation
403 # details for my comfort. So we re-implement here half of nix.fetchGit.
404 # :(
405
406 cachedir = git_cachedir(channel.git_repo)
407 if not os.path.exists(cachedir):
408 v.status("Initializing git repo")
409 process = subprocess.run(
410 ['git', 'init', '--bare', cachedir])
411 v.result(process.returncode == 0)
412
413 v.status('Fetching ref "%s" from %s' % (channel.git_ref, channel.git_repo))
414 # We don't use --force here because we want to abort and freak out if forced
415 # updates are happening.
416 process = subprocess.run(['git',
417 '-C',
418 cachedir,
419 'fetch',
420 channel.git_repo,
421 '%s:%s' % (channel.git_ref,
422 channel.git_ref)])
423 v.result(process.returncode == 0)
424
425 if desired_revision is not None:
426 v.status('Verifying that fetch retrieved this rev')
427 process = subprocess.run(
428 ['git', '-C', cachedir, 'cat-file', '-e', desired_revision])
429 v.result(process.returncode == 0)
430
431 new_revision = open(
432 os.path.join(
433 cachedir,
434 'refs',
435 'heads',
436 channel.git_ref)).read(999).strip()
437
438 verify_git_ancestry(v, channel, new_revision, old_revision)
439
440 return new_revision
441
442
443 def ensure_git_rev_available(
444 v: Verification,
445 channel: TarrableSearchPath,
446 pin: GitPin,
447 old_revision: Optional[str]) -> None:
448 cachedir = git_cachedir(channel.git_repo)
449 if os.path.exists(cachedir):
450 v.status('Checking if we already have this rev:')
451 process = subprocess.run(
452 ['git', '-C', cachedir, 'cat-file', '-e', pin.git_revision])
453 if process.returncode == 0:
454 v.status('yes')
455 if process.returncode == 1:
456 v.status('no')
457 v.result(process.returncode == 0 or process.returncode == 1)
458 if process.returncode == 0:
459 verify_git_ancestry(v, channel, pin.git_revision, old_revision)
460 return
461 git_fetch(v, channel, pin.git_revision, old_revision)
462
463
464 def compare_tarball_and_git(
465 v: Verification,
466 pin: GitPin,
467 channel_contents: str,
468 git_contents: str) -> None:
469 v.status('Comparing channel tarball with git checkout')
470 match, mismatch, errors = compare(os.path.join(
471 channel_contents, pin.release_name), git_contents)
472 v.ok()
473 v.check('%d files match' % len(match), len(match) > 0)
474 v.check('%d files differ' % len(mismatch), len(mismatch) == 0)
475 expected_errors = [
476 '.git-revision',
477 '.version-suffix',
478 'nixpkgs',
479 'programs.sqlite',
480 'svn-revision']
481 benign_errors = []
482 for ee in expected_errors:
483 if ee in errors:
484 errors.remove(ee)
485 benign_errors.append(ee)
486 v.check(
487 '%d unexpected incomparable files' %
488 len(errors),
489 len(errors) == 0)
490 v.check(
491 '(%d of %d expected incomparable files)' %
492 (len(benign_errors),
493 len(expected_errors)),
494 len(benign_errors) == len(expected_errors))
495
496
497 def extract_tarball(
498 v: Verification,
499 table: Dict[str, ChannelTableEntry],
500 dest: str) -> None:
501 v.status('Extracting tarball %s' % table['nixexprs.tar.xz'].file)
502 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
503 v.ok()
504
505
506 def git_checkout(
507 v: Verification,
508 channel: TarrableSearchPath,
509 pin: GitPin,
510 dest: str) -> None:
511 v.status('Checking out corresponding git revision')
512 git = subprocess.Popen(['git',
513 '-C',
514 git_cachedir(channel.git_repo),
515 'archive',
516 pin.git_revision],
517 stdout=subprocess.PIPE)
518 tar = subprocess.Popen(
519 ['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout)
520 if git.stdout:
521 git.stdout.close()
522 tar.wait()
523 git.wait()
524 v.result(git.returncode == 0 and tar.returncode == 0)
525
526
527 def git_get_tarball(
528 v: Verification,
529 channel: TarrableSearchPath,
530 pin: GitPin) -> str:
531 cache_file = tarball_cache_file(channel, pin)
532 if os.path.exists(cache_file):
533 cached_tarball = open(cache_file).read(9999)
534 if os.path.exists(cached_tarball):
535 return cached_tarball
536
537 with tempfile.TemporaryDirectory() as output_dir:
538 output_filename = os.path.join(
539 output_dir, pin.release_name + '.tar.xz')
540 with open(output_filename, 'w') as output_file:
541 v.status(
542 'Generating tarball for git revision %s' %
543 pin.git_revision)
544 git = subprocess.Popen(['git',
545 '-C',
546 git_cachedir(channel.git_repo),
547 'archive',
548 '--prefix=%s/' % pin.release_name,
549 pin.git_revision],
550 stdout=subprocess.PIPE)
551 xz = subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file)
552 xz.wait()
553 git.wait()
554 v.result(git.returncode == 0 and xz.returncode == 0)
555
556 store_tarball = copy_to_nix_store(v, output_filename)
557
558 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
559 open(cache_file, 'w').write(store_tarball)
560 return store_tarball # type: ignore # (for old mypy)
561
562
563 def check_channel_metadata(
564 v: Verification,
565 pin: GitPin,
566 channel_contents: str) -> None:
567 v.status('Verifying git commit in channel tarball')
568 v.result(
569 open(
570 os.path.join(
571 channel_contents,
572 pin.release_name,
573 '.git-revision')).read(999) == pin.git_revision)
574
575 v.status(
576 'Verifying version-suffix is a suffix of release name %s:' %
577 pin.release_name)
578 version_suffix = open(
579 os.path.join(
580 channel_contents,
581 pin.release_name,
582 '.version-suffix')).read(999)
583 v.status(version_suffix)
584 v.result(pin.release_name.endswith(version_suffix))
585
586
587 def check_channel_contents(
588 v: Verification,
589 channel: TarrableSearchPath,
590 table: Dict[str, ChannelTableEntry],
591 pin: GitPin) -> None:
592 with tempfile.TemporaryDirectory() as channel_contents, \
593 tempfile.TemporaryDirectory() as git_contents:
594
595 extract_tarball(v, table, channel_contents)
596 check_channel_metadata(v, pin, channel_contents)
597
598 git_checkout(v, channel, pin, git_contents)
599
600 compare_tarball_and_git(v, pin, channel_contents, git_contents)
601
602 v.status('Removing temporary directories')
603 v.ok()
604
605
606 def git_revision_name(
607 v: Verification,
608 channel: TarrableSearchPath,
609 git_revision: str) -> str:
610 v.status('Getting commit date')
611 process = subprocess.run(['git',
612 '-C',
613 git_cachedir(channel.git_repo),
614 'log',
615 '-n1',
616 '--format=%ct-%h',
617 '--abbrev=11',
618 '--no-show-signature',
619 git_revision],
620 stdout=subprocess.PIPE)
621 v.result(process.returncode == 0 and process.stdout != b'')
622 return '%s-%s' % (os.path.basename(channel.git_repo),
623 process.stdout.decode().strip())
624
625
626 K = TypeVar('K')
627 V = TypeVar('V')
628
629
630 def partition_dict(pred: Callable[[K, V], bool],
631 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
632 selected: Dict[K, V] = {}
633 remaining: Dict[K, V] = {}
634 for k, v in d.items():
635 if pred(k, v):
636 selected[k] = v
637 else:
638 remaining[k] = v
639 return selected, remaining
640
641
642 def filter_dict(d: Dict[K, V], fields: Set[K]
643 ) -> Tuple[Dict[K, V], Dict[K, V]]:
644 return partition_dict(lambda k, v: k in fields, d)
645
646
647 def read_config_section(
648 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
649 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
650 'alias': (AliasSearchPath, AliasPin),
651 'channel': (ChannelSearchPath, ChannelPin),
652 'git': (GitSearchPath, GitPin),
653 'symlink': (SymlinkSearchPath, SymlinkPin),
654 }
655 SP, P = mapping[conf['type']]
656 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
657 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
658 # Error suppression works around https://github.com/python/mypy/issues/9007
659 pin_present = pin_fields != {} or P._fields == ()
660 pin = P(**pin_fields) if pin_present else None # type: ignore
661 return SP(**remaining_fields), pin
662
663
664 def read_pinned_config_section(
665 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
666 sp, pin = read_config_section(conf)
667 if pin is None:
668 raise Exception(
669 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
670 section)
671 return sp, pin
672
673
674 def read_config(filename: str) -> configparser.ConfigParser:
675 config = configparser.ConfigParser()
676 config.read_file(open(filename), filename)
677 return config
678
679
680 def read_config_files(
681 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
682 merged_config: Dict[str, configparser.SectionProxy] = {}
683 for file in filenames:
684 config = read_config(file)
685 for section in config.sections():
686 if section in merged_config:
687 raise Exception('Duplicate channel "%s"' % section)
688 merged_config[section] = config[section]
689 return merged_config
690
691
692 def pinCommand(args: argparse.Namespace) -> None:
693 v = Verification()
694 config = read_config(args.channels_file)
695 for section in config.sections():
696 if args.channels and section not in args.channels:
697 continue
698
699 sp, old_pin = read_config_section(config[section])
700
701 config[section].update(sp.pin(v, old_pin)._asdict())
702
703 with open(args.channels_file, 'w') as configfile:
704 config.write(configfile)
705
706
707 def updateCommand(args: argparse.Namespace) -> None:
708 v = Verification()
709 exprs: Dict[str, str] = {}
710 config = {
711 section: read_pinned_config_section(section, conf) for section,
712 conf in read_config_files(
713 args.channels_file).items()}
714 alias, nonalias = partition_dict(
715 lambda k, v: isinstance(v[0], AliasSearchPath), config)
716
717 for section, (sp, pin) in nonalias.items():
718 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
719 assert not isinstance(pin, AliasPin) # partition_dict()
720 tarball = sp.fetch(v, pin)
721 exprs[section] = (
722 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
723 (pin.release_name, tarball))
724
725 for section, (sp, pin) in alias.items():
726 assert isinstance(sp, AliasSearchPath) # For mypy
727 exprs[section] = exprs[sp.alias_of]
728
729 command = [
730 'nix-env',
731 '--profile',
732 args.profile,
733 '--show-trace',
734 '--file',
735 '<nix/unpack-channel.nix>',
736 '--install',
737 '--from-expression'] + [exprs[name] % name for name in sorted(exprs.keys())]
738 if args.dry_run:
739 print(' '.join(map(shlex.quote, command)))
740 else:
741 v.status('Installing channels with nix-env')
742 process = subprocess.run(command)
743 v.result(process.returncode == 0)
744
745
746 def main() -> None:
747 parser = argparse.ArgumentParser(prog='pinch')
748 subparsers = parser.add_subparsers(dest='mode', required=True)
749 parser_pin = subparsers.add_parser('pin')
750 parser_pin.add_argument('channels_file', type=str)
751 parser_pin.add_argument('channels', type=str, nargs='*')
752 parser_pin.set_defaults(func=pinCommand)
753 parser_update = subparsers.add_parser('update')
754 parser_update.add_argument('--dry-run', action='store_true')
755 parser_update.add_argument('--profile', default=(
756 '/nix/var/nix/profiles/per-user/%s/channels' % getpass.getuser()))
757 parser_update.add_argument('channels_file', type=str, nargs='+')
758 parser_update.set_defaults(func=updateCommand)
759 args = parser.parse_args()
760 args.func(args)
761
762
763 if __name__ == '__main__':
764 main()