]> git.scottworley.com Git - pinch/blob - pinch.py
37d37a7c137727bed59251a0b0b1619f19cbeab7
[pinch] / pinch.py
1 import argparse
2 import configparser
3 import filecmp
4 import functools
5 import getpass
6 import hashlib
7 import operator
8 import os
9 import os.path
10 import shlex
11 import shutil
12 import subprocess
13 import sys
14 import tarfile
15 import tempfile
16 import types
17 import urllib.parse
18 import urllib.request
19 import xml.dom.minidom
20
21 from typing import (
22 Callable,
23 Dict,
24 Iterable,
25 List,
26 Mapping,
27 NamedTuple,
28 NewType,
29 Optional,
30 Set,
31 Tuple,
32 Type,
33 TypeVar,
34 Union,
35 )
36
37 # Use xdg module when it's less painful to have as a dependency
38
39
40 class XDG(NamedTuple):
41 XDG_CACHE_HOME: str
42
43
44 xdg = XDG(
45 XDG_CACHE_HOME=os.getenv(
46 'XDG_CACHE_HOME',
47 os.path.expanduser('~/.cache')))
48
49
50 class VerificationError(Exception):
51 pass
52
53
54 class Verification:
55
56 def __init__(self) -> None:
57 self.line_length = 0
58
59 def status(self, s: str) -> None:
60 print(s, end=' ', file=sys.stderr, flush=True)
61 self.line_length += 1 + len(s) # Unicode??
62
63 @staticmethod
64 def _color(s: str, c: int) -> str:
65 return '\033[%2dm%s\033[00m' % (c, s)
66
67 def result(self, r: bool) -> None:
68 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
69 length = len(message)
70 cols = shutil.get_terminal_size().columns or 80
71 pad = (cols - (self.line_length + length)) % cols
72 print(' ' * pad + self._color(message, color), file=sys.stderr)
73 self.line_length = 0
74 if not r:
75 raise VerificationError()
76
77 def check(self, s: str, r: bool) -> None:
78 self.status(s)
79 self.result(r)
80
81 def ok(self) -> None:
82 self.result(True)
83
84
85 Digest16 = NewType('Digest16', str)
86 Digest32 = NewType('Digest32', str)
87
88
89 class ChannelTableEntry(types.SimpleNamespace):
90 absolute_url: str
91 digest: Digest16
92 file: str
93 size: int
94 url: str
95
96
97 class AliasPin(NamedTuple):
98 pass
99
100
101 class SymlinkPin(NamedTuple):
102 @property
103 def release_name(self) -> str:
104 return 'link'
105
106
107 class GitPin(NamedTuple):
108 git_revision: str
109 release_name: str
110
111
112 class ChannelPin(NamedTuple):
113 git_revision: str
114 release_name: str
115 tarball_url: str
116 tarball_sha256: str
117
118
119 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
120
121
122 def copy_to_nix_store(v: Verification, filename: str) -> str:
123 v.status('Putting tarball in Nix store')
124 process = subprocess.run(
125 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
126 v.result(process.returncode == 0)
127 return process.stdout.decode().strip()
128
129
130 class AliasSearchPath(NamedTuple):
131 alias_of: str
132
133 # pylint: disable=no-self-use
134 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
135 return AliasPin()
136
137
138 class SymlinkSearchPath(NamedTuple):
139 path: str
140
141 # pylint: disable=no-self-use
142 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
143 return SymlinkPin()
144
145 def fetch(self, v: Verification, _: Pin) -> str:
146 with tempfile.TemporaryDirectory() as td:
147 archive_filename = os.path.join(td, 'link.tar.gz')
148 os.symlink(self.path, os.path.join(td, 'link'))
149 with tarfile.open(archive_filename, mode='x:gz') as t:
150 t.add(os.path.join(td, 'link'), arcname='link')
151 return copy_to_nix_store(v, archive_filename)
152
153
154 class GitSearchPath(NamedTuple):
155 git_ref: str
156 git_repo: str
157
158 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
159 if old_pin is not None:
160 assert isinstance(old_pin, GitPin)
161 old_revision = old_pin.git_revision if old_pin is not None else None
162
163 new_revision = git_fetch(v, self, None, old_revision)
164 return GitPin(release_name=git_revision_name(v, self, new_revision),
165 git_revision=new_revision)
166
167 def fetch(self, v: Verification, pin: Pin) -> str:
168 assert isinstance(pin, GitPin)
169 ensure_git_rev_available(v, self, pin, None)
170 return git_get_tarball(v, self, pin)
171
172
173 class ChannelSearchPath(NamedTuple):
174 channel_url: str
175 git_ref: str
176 git_repo: str
177
178 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
179 if old_pin is not None:
180 assert isinstance(old_pin, ChannelPin)
181 old_revision = old_pin.git_revision if old_pin is not None else None
182
183 channel_html, forwarded_url = fetch_channel(v, self)
184 table, new_gitpin = parse_channel(v, channel_html)
185 fetch_resources(v, new_gitpin, forwarded_url, table)
186 ensure_git_rev_available(v, self, new_gitpin, old_revision)
187 check_channel_contents(v, self, table, new_gitpin)
188 return ChannelPin(
189 release_name=new_gitpin.release_name,
190 tarball_url=table['nixexprs.tar.xz'].absolute_url,
191 tarball_sha256=table['nixexprs.tar.xz'].digest,
192 git_revision=new_gitpin.git_revision)
193
194 # pylint: disable=no-self-use
195 def fetch(self, v: Verification, pin: Pin) -> str:
196 assert isinstance(pin, ChannelPin)
197
198 return fetch_with_nix_prefetch_url(
199 v, pin.tarball_url, Digest16(pin.tarball_sha256))
200
201
202 SearchPath = Union[AliasSearchPath,
203 SymlinkSearchPath,
204 GitSearchPath,
205 ChannelSearchPath]
206 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
207
208
209 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
210
211 def throw(error: OSError) -> None:
212 raise error
213
214 def join(x: str, y: str) -> str:
215 return y if x == '.' else os.path.join(x, y)
216
217 def recursive_files(d: str) -> Iterable[str]:
218 all_files: List[str] = []
219 for path, dirs, files in os.walk(d, onerror=throw):
220 rel = os.path.relpath(path, start=d)
221 all_files.extend(join(rel, f) for f in files)
222 for dir_or_link in dirs:
223 if os.path.islink(join(path, dir_or_link)):
224 all_files.append(join(rel, dir_or_link))
225 return all_files
226
227 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
228 return (f for f in files if not f.startswith('.git/'))
229
230 files = functools.reduce(
231 operator.or_, (set(
232 exclude_dot_git(
233 recursive_files(x))) for x in [a, b]))
234 return filecmp.cmpfiles(a, b, files, shallow=False)
235
236
237 def fetch_channel(
238 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
239 v.status('Fetching channel')
240 request = urllib.request.urlopen(channel.channel_url, timeout=10)
241 channel_html = request.read()
242 forwarded_url = request.geturl()
243 v.result(request.status == 200) # type: ignore # (for old mypy)
244 v.check('Got forwarded', channel.channel_url != forwarded_url)
245 return channel_html, forwarded_url
246
247
248 def parse_channel(v: Verification, channel_html: str) \
249 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
250 v.status('Parsing channel description as XML')
251 d = xml.dom.minidom.parseString(channel_html)
252 v.ok()
253
254 v.status('Extracting release name:')
255 title_name = d.getElementsByTagName(
256 'title')[0].firstChild.nodeValue.split()[2]
257 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
258 v.status(title_name)
259 v.result(title_name == h1_name)
260
261 v.status('Extracting git commit:')
262 git_commit_node = d.getElementsByTagName('tt')[0]
263 git_revision = git_commit_node.firstChild.nodeValue
264 v.status(git_revision)
265 v.ok()
266 v.status('Verifying git commit label')
267 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
268
269 v.status('Parsing table')
270 table: Dict[str, ChannelTableEntry] = {}
271 for row in d.getElementsByTagName('tr')[1:]:
272 name = row.childNodes[0].firstChild.firstChild.nodeValue
273 url = row.childNodes[0].firstChild.getAttribute('href')
274 size = int(row.childNodes[1].firstChild.nodeValue)
275 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
276 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
277 v.ok()
278 return table, GitPin(release_name=title_name, git_revision=git_revision)
279
280
281 def digest_string(s: bytes) -> Digest16:
282 return Digest16(hashlib.sha256(s).hexdigest())
283
284
285 def digest_file(filename: str) -> Digest16:
286 hasher = hashlib.sha256()
287 with open(filename, 'rb') as f:
288 # pylint: disable=cell-var-from-loop
289 for block in iter(lambda: f.read(4096), b''):
290 hasher.update(block)
291 return Digest16(hasher.hexdigest())
292
293
294 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
295 v.status('Converting digest to base16')
296 process = subprocess.run(
297 ['nix', 'to-base16', '--type', 'sha256', digest32], stdout=subprocess.PIPE)
298 v.result(process.returncode == 0)
299 return Digest16(process.stdout.decode().strip())
300
301
302 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
303 v.status('Converting digest to base32')
304 process = subprocess.run(
305 ['nix', 'to-base32', '--type', 'sha256', digest16], stdout=subprocess.PIPE)
306 v.result(process.returncode == 0)
307 return Digest32(process.stdout.decode().strip())
308
309
310 def fetch_with_nix_prefetch_url(
311 v: Verification,
312 url: str,
313 digest: Digest16) -> str:
314 v.status('Fetching %s' % url)
315 process = subprocess.run(
316 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
317 v.result(process.returncode == 0)
318 prefetch_digest, path, empty = process.stdout.decode().split('\n')
319 assert empty == ''
320 v.check("Verifying nix-prefetch-url's digest",
321 to_Digest16(v, Digest32(prefetch_digest)) == digest)
322 v.status("Verifying file digest")
323 file_digest = digest_file(path)
324 v.result(file_digest == digest)
325 return path # type: ignore # (for old mypy)
326
327
328 def fetch_resources(
329 v: Verification,
330 pin: GitPin,
331 forwarded_url: str,
332 table: Dict[str, ChannelTableEntry]) -> None:
333 for resource in ['git-revision', 'nixexprs.tar.xz']:
334 fields = table[resource]
335 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
336 fields.file = fetch_with_nix_prefetch_url(
337 v, fields.absolute_url, fields.digest)
338 v.status('Verifying git commit on main page matches git commit in table')
339 v.result(open(table['git-revision'].file).read(999) == pin.git_revision)
340
341
342 def git_cachedir(git_repo: str) -> str:
343 return os.path.join(
344 xdg.XDG_CACHE_HOME,
345 'pinch/git',
346 digest_string(git_repo.encode()))
347
348
349 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
350 return os.path.join(
351 xdg.XDG_CACHE_HOME,
352 'pinch/git-tarball',
353 '%s-%s-%s' %
354 (digest_string(channel.git_repo.encode()),
355 pin.git_revision,
356 pin.release_name))
357
358
359 def verify_git_ancestry(
360 v: Verification,
361 channel: TarrableSearchPath,
362 new_revision: str,
363 old_revision: Optional[str]) -> None:
364 cachedir = git_cachedir(channel.git_repo)
365 v.status('Verifying rev is an ancestor of ref')
366 process = subprocess.run(['git',
367 '-C',
368 cachedir,
369 'merge-base',
370 '--is-ancestor',
371 new_revision,
372 channel.git_ref])
373 v.result(process.returncode == 0)
374
375 if old_revision is not None:
376 v.status(
377 'Verifying rev is an ancestor of previous rev %s' %
378 old_revision)
379 process = subprocess.run(['git',
380 '-C',
381 cachedir,
382 'merge-base',
383 '--is-ancestor',
384 old_revision,
385 new_revision])
386 v.result(process.returncode == 0)
387
388
389 def git_fetch(
390 v: Verification,
391 channel: TarrableSearchPath,
392 desired_revision: Optional[str],
393 old_revision: Optional[str]) -> str:
394 # It would be nice if we could share the nix git cache, but as of the time
395 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
396 # yet), and trying to straddle them both is too far into nix implementation
397 # details for my comfort. So we re-implement here half of nix.fetchGit.
398 # :(
399
400 cachedir = git_cachedir(channel.git_repo)
401 if not os.path.exists(cachedir):
402 v.status("Initializing git repo")
403 process = subprocess.run(
404 ['git', 'init', '--bare', cachedir])
405 v.result(process.returncode == 0)
406
407 v.status('Fetching ref "%s" from %s' % (channel.git_ref, channel.git_repo))
408 # We don't use --force here because we want to abort and freak out if forced
409 # updates are happening.
410 process = subprocess.run(['git',
411 '-C',
412 cachedir,
413 'fetch',
414 channel.git_repo,
415 '%s:%s' % (channel.git_ref,
416 channel.git_ref)])
417 v.result(process.returncode == 0)
418
419 if desired_revision is not None:
420 v.status('Verifying that fetch retrieved this rev')
421 process = subprocess.run(
422 ['git', '-C', cachedir, 'cat-file', '-e', desired_revision])
423 v.result(process.returncode == 0)
424
425 new_revision = open(
426 os.path.join(
427 cachedir,
428 'refs',
429 'heads',
430 channel.git_ref)).read(999).strip()
431
432 verify_git_ancestry(v, channel, new_revision, old_revision)
433
434 return new_revision
435
436
437 def ensure_git_rev_available(
438 v: Verification,
439 channel: TarrableSearchPath,
440 pin: GitPin,
441 old_revision: Optional[str]) -> None:
442 cachedir = git_cachedir(channel.git_repo)
443 if os.path.exists(cachedir):
444 v.status('Checking if we already have this rev:')
445 process = subprocess.run(
446 ['git', '-C', cachedir, 'cat-file', '-e', pin.git_revision])
447 if process.returncode == 0:
448 v.status('yes')
449 if process.returncode == 1:
450 v.status('no')
451 v.result(process.returncode == 0 or process.returncode == 1)
452 if process.returncode == 0:
453 verify_git_ancestry(v, channel, pin.git_revision, old_revision)
454 return
455 git_fetch(v, channel, pin.git_revision, old_revision)
456
457
458 def compare_tarball_and_git(
459 v: Verification,
460 pin: GitPin,
461 channel_contents: str,
462 git_contents: str) -> None:
463 v.status('Comparing channel tarball with git checkout')
464 match, mismatch, errors = compare(os.path.join(
465 channel_contents, pin.release_name), git_contents)
466 v.ok()
467 v.check('%d files match' % len(match), len(match) > 0)
468 v.check('%d files differ' % len(mismatch), len(mismatch) == 0)
469 expected_errors = [
470 '.git-revision',
471 '.version-suffix',
472 'nixpkgs',
473 'programs.sqlite',
474 'svn-revision']
475 benign_errors = []
476 for ee in expected_errors:
477 if ee in errors:
478 errors.remove(ee)
479 benign_errors.append(ee)
480 v.check(
481 '%d unexpected incomparable files' %
482 len(errors),
483 len(errors) == 0)
484 v.check(
485 '(%d of %d expected incomparable files)' %
486 (len(benign_errors),
487 len(expected_errors)),
488 len(benign_errors) == len(expected_errors))
489
490
491 def extract_tarball(
492 v: Verification,
493 table: Dict[str, ChannelTableEntry],
494 dest: str) -> None:
495 v.status('Extracting tarball %s' % table['nixexprs.tar.xz'].file)
496 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
497 v.ok()
498
499
500 def git_checkout(
501 v: Verification,
502 channel: TarrableSearchPath,
503 pin: GitPin,
504 dest: str) -> None:
505 v.status('Checking out corresponding git revision')
506 git = subprocess.Popen(['git',
507 '-C',
508 git_cachedir(channel.git_repo),
509 'archive',
510 pin.git_revision],
511 stdout=subprocess.PIPE)
512 tar = subprocess.Popen(
513 ['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout)
514 if git.stdout:
515 git.stdout.close()
516 tar.wait()
517 git.wait()
518 v.result(git.returncode == 0 and tar.returncode == 0)
519
520
521 def git_get_tarball(
522 v: Verification,
523 channel: TarrableSearchPath,
524 pin: GitPin) -> str:
525 cache_file = tarball_cache_file(channel, pin)
526 if os.path.exists(cache_file):
527 cached_tarball = open(cache_file).read(9999)
528 if os.path.exists(cached_tarball):
529 return cached_tarball
530
531 with tempfile.TemporaryDirectory() as output_dir:
532 output_filename = os.path.join(
533 output_dir, pin.release_name + '.tar.xz')
534 with open(output_filename, 'w') as output_file:
535 v.status(
536 'Generating tarball for git revision %s' %
537 pin.git_revision)
538 git = subprocess.Popen(['git',
539 '-C',
540 git_cachedir(channel.git_repo),
541 'archive',
542 '--prefix=%s/' % pin.release_name,
543 pin.git_revision],
544 stdout=subprocess.PIPE)
545 xz = subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file)
546 xz.wait()
547 git.wait()
548 v.result(git.returncode == 0 and xz.returncode == 0)
549
550 store_tarball = copy_to_nix_store(v, output_filename)
551
552 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
553 open(cache_file, 'w').write(store_tarball)
554 return store_tarball # type: ignore # (for old mypy)
555
556
557 def check_channel_metadata(
558 v: Verification,
559 pin: GitPin,
560 channel_contents: str) -> None:
561 v.status('Verifying git commit in channel tarball')
562 v.result(
563 open(
564 os.path.join(
565 channel_contents,
566 pin.release_name,
567 '.git-revision')).read(999) == pin.git_revision)
568
569 v.status(
570 'Verifying version-suffix is a suffix of release name %s:' %
571 pin.release_name)
572 version_suffix = open(
573 os.path.join(
574 channel_contents,
575 pin.release_name,
576 '.version-suffix')).read(999)
577 v.status(version_suffix)
578 v.result(pin.release_name.endswith(version_suffix))
579
580
581 def check_channel_contents(
582 v: Verification,
583 channel: TarrableSearchPath,
584 table: Dict[str, ChannelTableEntry],
585 pin: GitPin) -> None:
586 with tempfile.TemporaryDirectory() as channel_contents, \
587 tempfile.TemporaryDirectory() as git_contents:
588
589 extract_tarball(v, table, channel_contents)
590 check_channel_metadata(v, pin, channel_contents)
591
592 git_checkout(v, channel, pin, git_contents)
593
594 compare_tarball_and_git(v, pin, channel_contents, git_contents)
595
596 v.status('Removing temporary directories')
597 v.ok()
598
599
600 def git_revision_name(
601 v: Verification,
602 channel: TarrableSearchPath,
603 git_revision: str) -> str:
604 v.status('Getting commit date')
605 process = subprocess.run(['git',
606 '-C',
607 git_cachedir(channel.git_repo),
608 'log',
609 '-n1',
610 '--format=%ct-%h',
611 '--abbrev=11',
612 '--no-show-signature',
613 git_revision],
614 stdout=subprocess.PIPE)
615 v.result(process.returncode == 0 and process.stdout != b'')
616 return '%s-%s' % (os.path.basename(channel.git_repo),
617 process.stdout.decode().strip())
618
619
620 K = TypeVar('K')
621 V = TypeVar('V')
622
623
624 def partition_dict(pred: Callable[[K, V], bool],
625 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
626 selected: Dict[K, V] = {}
627 remaining: Dict[K, V] = {}
628 for k, v in d.items():
629 if pred(k, v):
630 selected[k] = v
631 else:
632 remaining[k] = v
633 return selected, remaining
634
635
636 def filter_dict(d: Dict[K, V], fields: Set[K]
637 ) -> Tuple[Dict[K, V], Dict[K, V]]:
638 return partition_dict(lambda k, v: k in fields, d)
639
640
641 def read_config_section(
642 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
643 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
644 'alias': (AliasSearchPath, AliasPin),
645 'channel': (ChannelSearchPath, ChannelPin),
646 'git': (GitSearchPath, GitPin),
647 'symlink': (SymlinkSearchPath, SymlinkPin),
648 }
649 SP, P = mapping[conf['type']]
650 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
651 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
652 # Error suppression works around https://github.com/python/mypy/issues/9007
653 pin_present = pin_fields != {} or P._fields == ()
654 pin = P(**pin_fields) if pin_present else None # type:ignore[call-arg]
655 return SP(**remaining_fields), pin
656
657
658 def read_pinned_config_section(
659 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
660 sp, pin = read_config_section(conf)
661 if pin is None:
662 raise Exception(
663 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
664 section)
665 return sp, pin
666
667
668 def read_config(filename: str) -> configparser.ConfigParser:
669 config = configparser.ConfigParser()
670 config.read_file(open(filename), filename)
671 return config
672
673
674 def read_config_files(
675 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
676 merged_config: Dict[str, configparser.SectionProxy] = {}
677 for file in filenames:
678 config = read_config(file)
679 for section in config.sections():
680 if section in merged_config:
681 raise Exception('Duplicate channel "%s"' % section)
682 merged_config[section] = config[section]
683 return merged_config
684
685
686 def pinCommand(args: argparse.Namespace) -> None:
687 v = Verification()
688 config = read_config(args.channels_file)
689 for section in config.sections():
690 if args.channels and section not in args.channels:
691 continue
692
693 sp, old_pin = read_config_section(config[section])
694
695 config[section].update(sp.pin(v, old_pin)._asdict())
696
697 with open(args.channels_file, 'w') as configfile:
698 config.write(configfile)
699
700
701 def updateCommand(args: argparse.Namespace) -> None:
702 v = Verification()
703 exprs: Dict[str, str] = {}
704 config = {
705 section: read_pinned_config_section(section, conf) for section,
706 conf in read_config_files(
707 args.channels_file).items()}
708 alias, nonalias = partition_dict(
709 lambda k, v: isinstance(v[0], AliasSearchPath), config)
710
711 for section, (sp, pin) in nonalias.items():
712 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
713 assert not isinstance(pin, AliasPin) # partition_dict()
714 tarball = sp.fetch(v, pin)
715 exprs[section] = (
716 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
717 (pin.release_name, tarball))
718
719 for section, (sp, pin) in alias.items():
720 assert isinstance(sp, AliasSearchPath) # For mypy
721 exprs[section] = exprs[sp.alias_of]
722
723 command = [
724 'nix-env',
725 '--profile',
726 '/nix/var/nix/profiles/per-user/%s/channels' %
727 getpass.getuser(),
728 '--show-trace',
729 '--file',
730 '<nix/unpack-channel.nix>',
731 '--install',
732 '--from-expression'] + [exprs[name] % name for name in sorted(exprs.keys())]
733 if args.dry_run:
734 print(' '.join(map(shlex.quote, command)))
735 else:
736 v.status('Installing channels with nix-env')
737 process = subprocess.run(command)
738 v.result(process.returncode == 0)
739
740
741 def main() -> None:
742 parser = argparse.ArgumentParser(prog='pinch')
743 subparsers = parser.add_subparsers(dest='mode', required=True)
744 parser_pin = subparsers.add_parser('pin')
745 parser_pin.add_argument('channels_file', type=str)
746 parser_pin.add_argument('channels', type=str, nargs='*')
747 parser_pin.set_defaults(func=pinCommand)
748 parser_update = subparsers.add_parser('update')
749 parser_update.add_argument('--dry-run', action='store_true')
750 parser_update.add_argument('channels_file', type=str, nargs='+')
751 parser_update.set_defaults(func=updateCommand)
752 args = parser.parse_args()
753 args.func(args)
754
755
756 if __name__ == '__main__':
757 main()