]> git.scottworley.com Git - pinch/blob - pinch.py
Start on 3.3.0
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Finding release name (1)')
270 title = d.getElementsByTagName('title')[0].firstChild
271 v.result(isinstance(title, xml.dom.minidom.CharacterData))
272 assert isinstance(title, xml.dom.minidom.CharacterData)
273 release_name = title.nodeValue.split()[2]
274 v.status('Finding release name (2)')
275 h1 = d.getElementsByTagName('h1')[0].firstChild
276 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
277 assert isinstance(h1, xml.dom.minidom.CharacterData)
278 v.status('Verifying release name:')
279 v.status(release_name)
280 v.result(release_name == h1.nodeValue.split()[2])
281
282 v.status('Finding git commit')
283 git_commit_node = d.getElementsByTagName('tt')[0]
284 v.result(
285 isinstance(
286 git_commit_node.firstChild,
287 xml.dom.minidom.CharacterData))
288 assert isinstance(
289 git_commit_node.firstChild,
290 xml.dom.minidom.CharacterData)
291 v.status('Extracting git commit:')
292 git_revision = git_commit_node.firstChild.nodeValue
293 v.status(git_revision)
294 v.ok()
295 v.status('Verifying git commit label')
296 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
297
298 v.status('Parsing table')
299 table: Dict[str, ChannelTableEntry] = {}
300 for row in d.getElementsByTagName('tr')[1:]:
301 name = row.childNodes[0].firstChild.firstChild.nodeValue
302 url = row.childNodes[0].firstChild.getAttribute('href')
303 size = int(row.childNodes[1].firstChild.nodeValue)
304 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
305 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
306 v.ok()
307 return table, GitPin(release_name=release_name, git_revision=git_revision)
308
309
310 def digest_string(s: bytes) -> Digest16:
311 return Digest16(hashlib.sha256(s).hexdigest())
312
313
314 def digest_file(filename: str) -> Digest16:
315 hasher = hashlib.sha256()
316 with open(filename, 'rb') as f:
317 # pylint: disable=cell-var-from-loop
318 for block in iter(lambda: f.read(4096), b''):
319 hasher.update(block)
320 return Digest16(hasher.hexdigest())
321
322
323 _NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
324
325
326 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
327 v.status('Converting digest to base16')
328 process = subprocess.run(_NIX_COMMAND + [
329 'to-base16',
330 '--type',
331 'sha256',
332 digest32],
333 stdout=subprocess.PIPE)
334 v.result(process.returncode == 0)
335 return Digest16(process.stdout.decode().strip())
336
337
338 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
339 v.status('Converting digest to base32')
340 process = subprocess.run(_NIX_COMMAND + [
341 'to-base32',
342 '--type',
343 'sha256',
344 digest16],
345 stdout=subprocess.PIPE)
346 v.result(process.returncode == 0)
347 return Digest32(process.stdout.decode().strip())
348
349
350 def fetch_with_nix_prefetch_url(
351 v: Verification,
352 url: str,
353 digest: Digest16) -> str:
354 v.status(f'Fetching {url}')
355 process = subprocess.run(
356 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
357 v.result(process.returncode == 0)
358 prefetch_digest, path, empty = process.stdout.decode().split('\n')
359 assert empty == ''
360 v.check("Verifying nix-prefetch-url's digest",
361 to_Digest16(v, Digest32(prefetch_digest)) == digest)
362 v.status(f"Verifying digest of {path}")
363 file_digest = digest_file(path)
364 v.result(file_digest == digest)
365 return path # type: ignore # (for old mypy)
366
367
368 def fetch_resources(
369 v: Verification,
370 pin: GitPin,
371 forwarded_url: str,
372 table: Dict[str, ChannelTableEntry]) -> None:
373 for resource in ['git-revision', 'nixexprs.tar.xz']:
374 fields = table[resource]
375 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
376 fields.file = fetch_with_nix_prefetch_url(
377 v, fields.absolute_url, fields.digest)
378 v.status('Verifying git commit on main page matches git commit in table')
379 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
380 v.result(rev_file.read(999) == pin.git_revision)
381
382
383 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
384 return os.path.join(
385 xdg.XDG_CACHE_HOME,
386 'pinch/git-tarball',
387 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
388
389
390 def verify_git_ancestry(
391 v: Verification,
392 channel: TarrableSearchPath,
393 old_revision: str,
394 new_revision: str) -> None:
395 cachedir = git_cache.git_cachedir(channel.git_repo)
396 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
397 process = subprocess.run(['git',
398 '-C',
399 cachedir,
400 'merge-base',
401 '--is-ancestor',
402 old_revision,
403 new_revision])
404 v.result(process.returncode == 0)
405
406
407 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
408 a = os.path.join(root1, path)
409 b = os.path.join(root2, path)
410 return (os.path.islink(a)
411 and os.path.islink(b)
412 and not os.path.exists(a)
413 and not os.path.exists(b)
414 and os.readlink(a) == os.readlink(b))
415
416
417 def compare_tarball_and_git(
418 v: Verification,
419 pin: GitPin,
420 channel_contents: str,
421 git_contents: str) -> None:
422 v.status('Comparing channel tarball with git checkout')
423 tarball_contents = os.path.join(channel_contents, pin.release_name)
424 match, mismatch, errors = compare(tarball_contents, git_contents)
425 v.ok()
426 v.check(f'{len(match)} files match', len(match) > 0)
427 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
428 expected_errors = [
429 '.git-revision',
430 '.version-suffix',
431 'nixpkgs',
432 'programs.sqlite',
433 'svn-revision']
434 benign_expected_errors = []
435 for ee in expected_errors:
436 if ee in errors:
437 errors.remove(ee)
438 benign_expected_errors.append(ee)
439 errors = [
440 e for e in errors
441 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
442 ]
443 v.check(
444 f'{len(errors)} unexpected incomparable files: {errors}',
445 len(errors) == 0)
446 v.check(
447 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
448 len(benign_expected_errors) == len(expected_errors))
449
450
451 def extract_tarball(
452 v: Verification,
453 table: Dict[str, ChannelTableEntry],
454 dest: str) -> None:
455 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
456 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
457 v.ok()
458
459
460 def git_checkout(
461 v: Verification,
462 channel: TarrableSearchPath,
463 pin: GitPin,
464 dest: str) -> None:
465 v.status('Checking out corresponding git revision')
466 with subprocess.Popen(
467 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
468 stdout=subprocess.PIPE) as git:
469 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
470 if git.stdout:
471 git.stdout.close()
472 tar.wait()
473 git.wait()
474 v.result(git.returncode == 0 and tar.returncode == 0)
475
476
477 def git_get_tarball(
478 v: Verification,
479 channel: TarrableSearchPath,
480 pin: GitPin) -> str:
481 cache_file = tarball_cache_file(channel, pin)
482 if os.path.exists(cache_file):
483 with open(cache_file, encoding='utf-8') as f:
484 cached_tarball = f.read(9999)
485 if os.path.exists(cached_tarball):
486 return cached_tarball
487
488 with tempfile.TemporaryDirectory() as output_dir:
489 output_filename = os.path.join(
490 output_dir, pin.release_name + '.tar.xz')
491 with open(output_filename, 'w', encoding='utf-8') as output_file:
492 v.status(f'Generating tarball for git revision {pin.git_revision}')
493 with subprocess.Popen(
494 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
495 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
496 stdout=subprocess.PIPE) as git:
497 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
498 xz.wait()
499 git.wait()
500 v.result(git.returncode == 0 and xz.returncode == 0)
501
502 store_tarball = copy_to_nix_store(v, output_filename)
503
504 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
505 with open(cache_file, 'w', encoding='utf-8') as f:
506 f.write(store_tarball)
507 return store_tarball # type: ignore # (for old mypy)
508
509
510 def check_channel_metadata(
511 v: Verification,
512 pin: GitPin,
513 channel_contents: str) -> None:
514 v.status('Verifying git commit in channel tarball')
515 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
516 encoding='utf-8') as f:
517 v.result(f.read(999) == pin.git_revision)
518
519 v.status(
520 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
521 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
522 encoding='utf-8') as f:
523 version_suffix = f.read(999)
524 v.status(version_suffix)
525 v.result(pin.release_name.endswith(version_suffix))
526
527
528 def check_channel_contents(
529 v: Verification,
530 channel: TarrableSearchPath,
531 table: Dict[str, ChannelTableEntry],
532 pin: GitPin) -> None:
533 with tempfile.TemporaryDirectory() as channel_contents, \
534 tempfile.TemporaryDirectory() as git_contents:
535
536 extract_tarball(v, table, channel_contents)
537 check_channel_metadata(v, pin, channel_contents)
538
539 git_checkout(v, channel, pin, git_contents)
540
541 compare_tarball_and_git(v, pin, channel_contents, git_contents)
542
543 v.status('Removing temporary directories')
544 v.ok()
545
546
547 def git_revision_name(
548 v: Verification,
549 channel: TarrableSearchPath,
550 git_revision: str) -> str:
551 v.status('Getting commit date')
552 process = subprocess.run(['git',
553 '-C',
554 git_cache.git_cachedir(channel.git_repo),
555 'log',
556 '-n1',
557 '--format=%ct-%h',
558 '--abbrev=11',
559 '--no-show-signature',
560 git_revision],
561 stdout=subprocess.PIPE)
562 v.result(process.returncode == 0 and process.stdout != b'')
563 return f'{
564 os.path.basename(channel.git_repo)}-{
565 process.stdout.decode().strip()}'
566
567
568 K = TypeVar('K')
569 V = TypeVar('V')
570
571
572 def partition_dict(pred: Callable[[K, V], bool],
573 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
574 selected: Dict[K, V] = {}
575 remaining: Dict[K, V] = {}
576 for k, v in d.items():
577 if pred(k, v):
578 selected[k] = v
579 else:
580 remaining[k] = v
581 return selected, remaining
582
583
584 def filter_dict(d: Dict[K, V], fields: Set[K]
585 ) -> Tuple[Dict[K, V], Dict[K, V]]:
586 return partition_dict(lambda k, v: k in fields, d)
587
588
589 def read_config_section(
590 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
591 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
592 'alias': (AliasSearchPath, AliasPin),
593 'channel': (ChannelSearchPath, ChannelPin),
594 'git': (GitSearchPath, GitPin),
595 'symlink': (SymlinkSearchPath, SymlinkPin),
596 }
597 SP, P = mapping[conf['type']]
598 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
599 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
600 # Error suppression works around https://github.com/python/mypy/issues/9007
601 pin_present = pin_fields or P._fields == ()
602 pin = P(**pin_fields) if pin_present else None # type: ignore
603 return SP(**remaining_fields), pin
604
605
606 def read_pinned_config_section(
607 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
608 sp, pin = read_config_section(conf)
609 if pin is None:
610 raise RuntimeError(
611 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
612 return sp, pin
613
614
615 def read_config(filename: str) -> configparser.ConfigParser:
616 config = configparser.ConfigParser()
617 with open(filename, encoding='utf-8') as f:
618 config.read_file(f, filename)
619 return config
620
621
622 def read_config_files(
623 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
624 merged_config: Dict[str, configparser.SectionProxy] = {}
625 for file in filenames:
626 config = read_config(file)
627 for section in config.sections():
628 if section in merged_config:
629 raise RuntimeError('Duplicate channel "{section}"')
630 merged_config[section] = config[section]
631 return merged_config
632
633
634 def pinCommand(args: argparse.Namespace) -> None:
635 v = Verification()
636 config = read_config(args.channels_file)
637 for section in config.sections():
638 if args.channels and section not in args.channels:
639 continue
640
641 sp, old_pin = read_config_section(config[section])
642
643 config[section].update(sp.pin(v, old_pin)._asdict())
644
645 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
646 config.write(configfile)
647
648
649 def updateCommand(args: argparse.Namespace) -> None:
650 v = Verification()
651 exprs: Dict[str, str] = {}
652 profile_manifest = os.path.join(args.profile, "manifest.nix")
653 search_paths: List[str] = [
654 "-I", "pinch_profile=" + args.profile,
655 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
656 ] if os.path.exists(profile_manifest) else []
657 config = {
658 section: read_pinned_config_section(section, conf) for section,
659 conf in read_config_files(
660 args.channels_file).items()}
661 alias, nonalias = partition_dict(
662 lambda k, v: isinstance(v[0], AliasSearchPath), config)
663
664 for section, (sp, pin) in sorted(nonalias.items()):
665 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
666 assert not isinstance(pin, AliasPin) # partition_dict()
667 tarball = sp.fetch(v, pin)
668 search_paths.extend(
669 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
670 exprs[section] = (
671 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
672 f'src = builtins.storePath "{tarball}"; }}')
673
674 for section, (sp, pin) in alias.items():
675 assert isinstance(sp, AliasSearchPath) # For mypy
676 exprs[section] = exprs[sp.alias_of]
677
678 with tempfile.NamedTemporaryFile() as unpack_channel_nix:
679 unpack_channel_nix.write(b'''
680 { name, channelName, src, }:
681 derivation {
682 inherit name channelName src;
683 builder = "builtin:unpack-channel";
684 system = "builtin";
685 preferLocalBuild = true;
686 }
687 ''')
688 unpack_channel_nix.flush()
689
690 command = [
691 'nix-env',
692 '--profile',
693 args.profile,
694 '--show-trace',
695 '--file',
696 unpack_channel_nix.name,
697 '--install',
698 '--remove-all',
699 ] + search_paths + ['--from-expression'] + [
700 exprs[name] % name for name in sorted(exprs.keys())]
701 if args.dry_run:
702 print(' '.join(map(shlex.quote, command)))
703 else:
704 v.status('Installing channels with nix-env')
705 process = subprocess.run(command)
706 v.result(process.returncode == 0)
707
708
709 def main() -> None:
710 parser = argparse.ArgumentParser(prog='pinch')
711 subparsers = parser.add_subparsers(dest='mode', required=True)
712 parser_pin = subparsers.add_parser('pin')
713 parser_pin.add_argument('channels_file', type=str)
714 parser_pin.add_argument('channels', type=str, nargs='*')
715 parser_pin.set_defaults(func=pinCommand)
716 parser_update = subparsers.add_parser('update')
717 parser_update.add_argument('--dry-run', action='store_true')
718 parser_update.add_argument('--profile', default=(
719 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
720 parser_update.add_argument('channels_file', type=str, nargs='+')
721 parser_update.set_defaults(func=updateCommand)
722 args = parser.parse_args()
723 args.func(args)
724
725
726 if __name__ == '__main__':
727 main()