]> git.scottworley.com Git - pinch/blob - pinch.py
Allow identically-broken symlinks
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Finding release name (1)')
270 title = d.getElementsByTagName('title')[0].firstChild
271 v.result(isinstance(title, xml.dom.minidom.CharacterData))
272 assert isinstance(title, xml.dom.minidom.CharacterData)
273 release_name = title.nodeValue.split()[2]
274 v.status('Finding release name (2)')
275 h1 = d.getElementsByTagName('h1')[0].firstChild
276 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
277 assert isinstance(h1, xml.dom.minidom.CharacterData)
278 v.status('Verifying release name:')
279 v.status(release_name)
280 v.result(release_name == h1.nodeValue.split()[2])
281
282 v.status('Finding git commit')
283 git_commit_node = d.getElementsByTagName('tt')[0]
284 v.result(
285 isinstance(
286 git_commit_node.firstChild,
287 xml.dom.minidom.CharacterData))
288 assert isinstance(
289 git_commit_node.firstChild,
290 xml.dom.minidom.CharacterData)
291 v.status('Extracting git commit:')
292 git_revision = git_commit_node.firstChild.nodeValue
293 v.status(git_revision)
294 v.ok()
295 v.status('Verifying git commit label')
296 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
297
298 v.status('Parsing table')
299 table: Dict[str, ChannelTableEntry] = {}
300 for row in d.getElementsByTagName('tr')[1:]:
301 name = row.childNodes[0].firstChild.firstChild.nodeValue
302 url = row.childNodes[0].firstChild.getAttribute('href')
303 size = int(row.childNodes[1].firstChild.nodeValue)
304 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
305 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
306 v.ok()
307 return table, GitPin(release_name=release_name, git_revision=git_revision)
308
309
310 def digest_string(s: bytes) -> Digest16:
311 return Digest16(hashlib.sha256(s).hexdigest())
312
313
314 def digest_file(filename: str) -> Digest16:
315 hasher = hashlib.sha256()
316 with open(filename, 'rb') as f:
317 # pylint: disable=cell-var-from-loop
318 for block in iter(lambda: f.read(4096), b''):
319 hasher.update(block)
320 return Digest16(hasher.hexdigest())
321
322
323 @functools.lru_cache
324 def _experimental_flag_needed(v: Verification) -> bool:
325 v.status('Checking Nix version')
326 process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
327 v.result(process.returncode == 0)
328 return b'--experimental-features' in process.stdout
329
330
331 def _nix_command(v: Verification) -> List[str]:
332 return ['nix', '--experimental-features',
333 'nix-command'] if _experimental_flag_needed(v) else ['nix']
334
335
336 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
337 v.status('Converting digest to base16')
338 process = subprocess.run(_nix_command(v) + [
339 'to-base16',
340 '--type',
341 'sha256',
342 digest32],
343 stdout=subprocess.PIPE)
344 v.result(process.returncode == 0)
345 return Digest16(process.stdout.decode().strip())
346
347
348 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
349 v.status('Converting digest to base32')
350 process = subprocess.run(_nix_command(v) + [
351 'to-base32',
352 '--type',
353 'sha256',
354 digest16],
355 stdout=subprocess.PIPE)
356 v.result(process.returncode == 0)
357 return Digest32(process.stdout.decode().strip())
358
359
360 def fetch_with_nix_prefetch_url(
361 v: Verification,
362 url: str,
363 digest: Digest16) -> str:
364 v.status(f'Fetching {url}')
365 process = subprocess.run(
366 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
367 v.result(process.returncode == 0)
368 prefetch_digest, path, empty = process.stdout.decode().split('\n')
369 assert empty == ''
370 v.check("Verifying nix-prefetch-url's digest",
371 to_Digest16(v, Digest32(prefetch_digest)) == digest)
372 v.status(f"Verifying digest of {path}")
373 file_digest = digest_file(path)
374 v.result(file_digest == digest)
375 return path # type: ignore # (for old mypy)
376
377
378 def fetch_resources(
379 v: Verification,
380 pin: GitPin,
381 forwarded_url: str,
382 table: Dict[str, ChannelTableEntry]) -> None:
383 for resource in ['git-revision', 'nixexprs.tar.xz']:
384 fields = table[resource]
385 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
386 fields.file = fetch_with_nix_prefetch_url(
387 v, fields.absolute_url, fields.digest)
388 v.status('Verifying git commit on main page matches git commit in table')
389 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
390 v.result(rev_file.read(999) == pin.git_revision)
391
392
393 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
394 return os.path.join(
395 xdg.XDG_CACHE_HOME,
396 'pinch/git-tarball',
397 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
398
399
400 def verify_git_ancestry(
401 v: Verification,
402 channel: TarrableSearchPath,
403 old_revision: str,
404 new_revision: str) -> None:
405 cachedir = git_cache.git_cachedir(channel.git_repo)
406 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
407 process = subprocess.run(['git',
408 '-C',
409 cachedir,
410 'merge-base',
411 '--is-ancestor',
412 old_revision,
413 new_revision])
414 v.result(process.returncode == 0)
415
416
417 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
418 a = os.path.join(root1, path)
419 b = os.path.join(root2, path)
420 return (os.path.islink(a)
421 and os.path.islink(b)
422 and not os.path.exists(a)
423 and not os.path.exists(b)
424 and os.readlink(a) == os.readlink(b))
425
426
427 def compare_tarball_and_git(
428 v: Verification,
429 pin: GitPin,
430 channel_contents: str,
431 git_contents: str) -> None:
432 v.status('Comparing channel tarball with git checkout')
433 tarball_contents = os.path.join(channel_contents, pin.release_name)
434 match, mismatch, errors = compare(tarball_contents, git_contents)
435 v.ok()
436 v.check(f'{len(match)} files match', len(match) > 0)
437 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
438 expected_errors = [
439 '.git-revision',
440 '.version-suffix',
441 'nixpkgs',
442 'programs.sqlite',
443 'svn-revision']
444 benign_expected_errors = []
445 for ee in expected_errors:
446 if ee in errors:
447 errors.remove(ee)
448 benign_expected_errors.append(ee)
449 errors = [
450 e for e in errors
451 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
452 ]
453 v.check(
454 f'{len(errors)} unexpected incomparable files: {errors}',
455 len(errors) == 0)
456 v.check(
457 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
458 len(benign_expected_errors) == len(expected_errors))
459
460
461 def extract_tarball(
462 v: Verification,
463 table: Dict[str, ChannelTableEntry],
464 dest: str) -> None:
465 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
466 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
467 v.ok()
468
469
470 def git_checkout(
471 v: Verification,
472 channel: TarrableSearchPath,
473 pin: GitPin,
474 dest: str) -> None:
475 v.status('Checking out corresponding git revision')
476 with subprocess.Popen(
477 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
478 stdout=subprocess.PIPE) as git:
479 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
480 if git.stdout:
481 git.stdout.close()
482 tar.wait()
483 git.wait()
484 v.result(git.returncode == 0 and tar.returncode == 0)
485
486
487 def git_get_tarball(
488 v: Verification,
489 channel: TarrableSearchPath,
490 pin: GitPin) -> str:
491 cache_file = tarball_cache_file(channel, pin)
492 if os.path.exists(cache_file):
493 with open(cache_file, encoding='utf-8') as f:
494 cached_tarball = f.read(9999)
495 if os.path.exists(cached_tarball):
496 return cached_tarball
497
498 with tempfile.TemporaryDirectory() as output_dir:
499 output_filename = os.path.join(
500 output_dir, pin.release_name + '.tar.xz')
501 with open(output_filename, 'w', encoding='utf-8') as output_file:
502 v.status(f'Generating tarball for git revision {pin.git_revision}')
503 with subprocess.Popen(
504 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
505 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
506 stdout=subprocess.PIPE) as git:
507 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
508 xz.wait()
509 git.wait()
510 v.result(git.returncode == 0 and xz.returncode == 0)
511
512 store_tarball = copy_to_nix_store(v, output_filename)
513
514 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
515 with open(cache_file, 'w', encoding='utf-8') as f:
516 f.write(store_tarball)
517 return store_tarball # type: ignore # (for old mypy)
518
519
520 def check_channel_metadata(
521 v: Verification,
522 pin: GitPin,
523 channel_contents: str) -> None:
524 v.status('Verifying git commit in channel tarball')
525 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
526 encoding='utf-8') as f:
527 v.result(f.read(999) == pin.git_revision)
528
529 v.status(
530 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
531 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
532 encoding='utf-8') as f:
533 version_suffix = f.read(999)
534 v.status(version_suffix)
535 v.result(pin.release_name.endswith(version_suffix))
536
537
538 def check_channel_contents(
539 v: Verification,
540 channel: TarrableSearchPath,
541 table: Dict[str, ChannelTableEntry],
542 pin: GitPin) -> None:
543 with tempfile.TemporaryDirectory() as channel_contents, \
544 tempfile.TemporaryDirectory() as git_contents:
545
546 extract_tarball(v, table, channel_contents)
547 check_channel_metadata(v, pin, channel_contents)
548
549 git_checkout(v, channel, pin, git_contents)
550
551 compare_tarball_and_git(v, pin, channel_contents, git_contents)
552
553 v.status('Removing temporary directories')
554 v.ok()
555
556
557 def git_revision_name(
558 v: Verification,
559 channel: TarrableSearchPath,
560 git_revision: str) -> str:
561 v.status('Getting commit date')
562 process = subprocess.run(['git',
563 '-C',
564 git_cache.git_cachedir(channel.git_repo),
565 'log',
566 '-n1',
567 '--format=%ct-%h',
568 '--abbrev=11',
569 '--no-show-signature',
570 git_revision],
571 stdout=subprocess.PIPE)
572 v.result(process.returncode == 0 and process.stdout != b'')
573 return f'{os.path.basename(channel.git_repo)}-{process.stdout.decode().strip()}'
574
575
576 K = TypeVar('K')
577 V = TypeVar('V')
578
579
580 def partition_dict(pred: Callable[[K, V], bool],
581 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
582 selected: Dict[K, V] = {}
583 remaining: Dict[K, V] = {}
584 for k, v in d.items():
585 if pred(k, v):
586 selected[k] = v
587 else:
588 remaining[k] = v
589 return selected, remaining
590
591
592 def filter_dict(d: Dict[K, V], fields: Set[K]
593 ) -> Tuple[Dict[K, V], Dict[K, V]]:
594 return partition_dict(lambda k, v: k in fields, d)
595
596
597 def read_config_section(
598 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
599 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
600 'alias': (AliasSearchPath, AliasPin),
601 'channel': (ChannelSearchPath, ChannelPin),
602 'git': (GitSearchPath, GitPin),
603 'symlink': (SymlinkSearchPath, SymlinkPin),
604 }
605 SP, P = mapping[conf['type']]
606 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
607 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
608 # Error suppression works around https://github.com/python/mypy/issues/9007
609 pin_present = pin_fields or P._fields == ()
610 pin = P(**pin_fields) if pin_present else None # type: ignore
611 return SP(**remaining_fields), pin
612
613
614 def read_pinned_config_section(
615 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
616 sp, pin = read_config_section(conf)
617 if pin is None:
618 raise RuntimeError(
619 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
620 return sp, pin
621
622
623 def read_config(filename: str) -> configparser.ConfigParser:
624 config = configparser.ConfigParser()
625 with open(filename, encoding='utf-8') as f:
626 config.read_file(f, filename)
627 return config
628
629
630 def read_config_files(
631 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
632 merged_config: Dict[str, configparser.SectionProxy] = {}
633 for file in filenames:
634 config = read_config(file)
635 for section in config.sections():
636 if section in merged_config:
637 raise RuntimeError('Duplicate channel "{section}"')
638 merged_config[section] = config[section]
639 return merged_config
640
641
642 def pinCommand(args: argparse.Namespace) -> None:
643 v = Verification()
644 config = read_config(args.channels_file)
645 for section in config.sections():
646 if args.channels and section not in args.channels:
647 continue
648
649 sp, old_pin = read_config_section(config[section])
650
651 config[section].update(sp.pin(v, old_pin)._asdict())
652
653 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
654 config.write(configfile)
655
656
657 def updateCommand(args: argparse.Namespace) -> None:
658 v = Verification()
659 exprs: Dict[str, str] = {}
660 profile_manifest = os.path.join(args.profile, "manifest.nix")
661 search_paths: List[str] = [
662 "-I", "pinch_profile=" + args.profile,
663 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
664 ] if os.path.exists(profile_manifest) else []
665 config = {
666 section: read_pinned_config_section(section, conf) for section,
667 conf in read_config_files(
668 args.channels_file).items()}
669 alias, nonalias = partition_dict(
670 lambda k, v: isinstance(v[0], AliasSearchPath), config)
671
672 for section, (sp, pin) in sorted(nonalias.items()):
673 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
674 assert not isinstance(pin, AliasPin) # partition_dict()
675 tarball = sp.fetch(v, pin)
676 search_paths.extend(
677 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
678 exprs[section] = (
679 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
680 f'src = builtins.storePath "{tarball}"; }}')
681
682 for section, (sp, pin) in alias.items():
683 assert isinstance(sp, AliasSearchPath) # For mypy
684 exprs[section] = exprs[sp.alias_of]
685
686 command = [
687 'nix-env',
688 '--profile',
689 args.profile,
690 '--show-trace',
691 '--file',
692 '<nix/unpack-channel.nix>',
693 '--install',
694 '--remove-all',
695 ] + search_paths + ['--from-expression'] + [
696 exprs[name] % name for name in sorted(exprs.keys())]
697 if args.dry_run:
698 print(' '.join(map(shlex.quote, command)))
699 else:
700 v.status('Installing channels with nix-env')
701 process = subprocess.run(command)
702 v.result(process.returncode == 0)
703
704
705 def main() -> None:
706 parser = argparse.ArgumentParser(prog='pinch')
707 subparsers = parser.add_subparsers(dest='mode', required=True)
708 parser_pin = subparsers.add_parser('pin')
709 parser_pin.add_argument('channels_file', type=str)
710 parser_pin.add_argument('channels', type=str, nargs='*')
711 parser_pin.set_defaults(func=pinCommand)
712 parser_update = subparsers.add_parser('update')
713 parser_update.add_argument('--dry-run', action='store_true')
714 parser_update.add_argument('--profile', default=(
715 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
716 parser_update.add_argument('channels_file', type=str, nargs='+')
717 parser_update.set_defaults(func=updateCommand)
718 args = parser.parse_args()
719 args.func(args)
720
721
722 if __name__ == '__main__':
723 main()