]> git.scottworley.com Git - pinch/blob - pinch.py
33fd9bfc802d2ee5aa1ab66f365ab5ee921939a6
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Extracting release name:')
270 title_name = d.getElementsByTagName(
271 'title')[0].firstChild.nodeValue.split()[2]
272 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
273 v.status(title_name)
274 v.result(title_name == h1_name)
275
276 v.status('Extracting git commit:')
277 git_commit_node = d.getElementsByTagName('tt')[0]
278 git_revision = git_commit_node.firstChild.nodeValue
279 v.status(git_revision)
280 v.ok()
281 v.status('Verifying git commit label')
282 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
283
284 v.status('Parsing table')
285 table: Dict[str, ChannelTableEntry] = {}
286 for row in d.getElementsByTagName('tr')[1:]:
287 name = row.childNodes[0].firstChild.firstChild.nodeValue
288 url = row.childNodes[0].firstChild.getAttribute('href')
289 size = int(row.childNodes[1].firstChild.nodeValue)
290 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
291 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
292 v.ok()
293 return table, GitPin(release_name=title_name, git_revision=git_revision)
294
295
296 def digest_string(s: bytes) -> Digest16:
297 return Digest16(hashlib.sha256(s).hexdigest())
298
299
300 def digest_file(filename: str) -> Digest16:
301 hasher = hashlib.sha256()
302 with open(filename, 'rb') as f:
303 # pylint: disable=cell-var-from-loop
304 for block in iter(lambda: f.read(4096), b''):
305 hasher.update(block)
306 return Digest16(hasher.hexdigest())
307
308
309 @functools.lru_cache
310 def _experimental_flag_needed(v: Verification) -> bool:
311 v.status('Checking Nix version')
312 process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
313 v.result(process.returncode == 0)
314 return b'--experimental-features' in process.stdout
315
316
317 def _nix_command(v: Verification) -> List[str]:
318 return ['nix', '--experimental-features',
319 'nix-command'] if _experimental_flag_needed(v) else ['nix']
320
321
322 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
323 v.status('Converting digest to base16')
324 process = subprocess.run(_nix_command(v) + [
325 'to-base16',
326 '--type',
327 'sha256',
328 digest32],
329 stdout=subprocess.PIPE)
330 v.result(process.returncode == 0)
331 return Digest16(process.stdout.decode().strip())
332
333
334 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
335 v.status('Converting digest to base32')
336 process = subprocess.run(_nix_command(v) + [
337 'to-base32',
338 '--type',
339 'sha256',
340 digest16],
341 stdout=subprocess.PIPE)
342 v.result(process.returncode == 0)
343 return Digest32(process.stdout.decode().strip())
344
345
346 def fetch_with_nix_prefetch_url(
347 v: Verification,
348 url: str,
349 digest: Digest16) -> str:
350 v.status(f'Fetching {url}')
351 process = subprocess.run(
352 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
353 v.result(process.returncode == 0)
354 prefetch_digest, path, empty = process.stdout.decode().split('\n')
355 assert empty == ''
356 v.check("Verifying nix-prefetch-url's digest",
357 to_Digest16(v, Digest32(prefetch_digest)) == digest)
358 v.status(f"Verifying digest of {path}")
359 file_digest = digest_file(path)
360 v.result(file_digest == digest)
361 return path # type: ignore # (for old mypy)
362
363
364 def fetch_resources(
365 v: Verification,
366 pin: GitPin,
367 forwarded_url: str,
368 table: Dict[str, ChannelTableEntry]) -> None:
369 for resource in ['git-revision', 'nixexprs.tar.xz']:
370 fields = table[resource]
371 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
372 fields.file = fetch_with_nix_prefetch_url(
373 v, fields.absolute_url, fields.digest)
374 v.status('Verifying git commit on main page matches git commit in table')
375 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
376 v.result(rev_file.read(999) == pin.git_revision)
377
378
379 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
380 return os.path.join(
381 xdg.XDG_CACHE_HOME,
382 'pinch/git-tarball',
383 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
384
385
386 def verify_git_ancestry(
387 v: Verification,
388 channel: TarrableSearchPath,
389 old_revision: str,
390 new_revision: str) -> None:
391 cachedir = git_cache.git_cachedir(channel.git_repo)
392 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
393 process = subprocess.run(['git',
394 '-C',
395 cachedir,
396 'merge-base',
397 '--is-ancestor',
398 old_revision,
399 new_revision])
400 v.result(process.returncode == 0)
401
402
403 def compare_tarball_and_git(
404 v: Verification,
405 pin: GitPin,
406 channel_contents: str,
407 git_contents: str) -> None:
408 v.status('Comparing channel tarball with git checkout')
409 match, mismatch, errors = compare(os.path.join(
410 channel_contents, pin.release_name), git_contents)
411 v.ok()
412 v.check(f'{len(match)} files match', len(match) > 0)
413 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
414 expected_errors = [
415 '.git-revision',
416 '.version-suffix',
417 'nixpkgs',
418 'programs.sqlite',
419 'svn-revision']
420 permitted_errors = [
421 'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo.nix',
422 ]
423 benign_expected_errors = []
424 benign_permitted_errors = []
425 for ee in expected_errors:
426 if ee in errors:
427 errors.remove(ee)
428 benign_expected_errors.append(ee)
429 for pe in permitted_errors:
430 if pe in errors:
431 errors.remove(pe)
432 benign_permitted_errors.append(ee)
433 v.check(
434 f'{len(errors)} unexpected incomparable files: {errors}',
435 len(errors) == 0)
436 v.check(
437 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
438 len(benign_expected_errors) == len(expected_errors))
439 v.check(
440 f'({len(benign_permitted_errors)} of {len(permitted_errors)} permitted incomparable files)',
441 len(benign_permitted_errors) <= len(permitted_errors))
442
443
444 def extract_tarball(
445 v: Verification,
446 table: Dict[str, ChannelTableEntry],
447 dest: str) -> None:
448 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
449 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
450 v.ok()
451
452
453 def git_checkout(
454 v: Verification,
455 channel: TarrableSearchPath,
456 pin: GitPin,
457 dest: str) -> None:
458 v.status('Checking out corresponding git revision')
459 with subprocess.Popen(
460 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
461 stdout=subprocess.PIPE) as git:
462 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
463 if git.stdout:
464 git.stdout.close()
465 tar.wait()
466 git.wait()
467 v.result(git.returncode == 0 and tar.returncode == 0)
468
469
470 def git_get_tarball(
471 v: Verification,
472 channel: TarrableSearchPath,
473 pin: GitPin) -> str:
474 cache_file = tarball_cache_file(channel, pin)
475 if os.path.exists(cache_file):
476 with open(cache_file, encoding='utf-8') as f:
477 cached_tarball = f.read(9999)
478 if os.path.exists(cached_tarball):
479 return cached_tarball
480
481 with tempfile.TemporaryDirectory() as output_dir:
482 output_filename = os.path.join(
483 output_dir, pin.release_name + '.tar.xz')
484 with open(output_filename, 'w', encoding='utf-8') as output_file:
485 v.status(f'Generating tarball for git revision {pin.git_revision}')
486 with subprocess.Popen(
487 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
488 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
489 stdout=subprocess.PIPE) as git:
490 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
491 xz.wait()
492 git.wait()
493 v.result(git.returncode == 0 and xz.returncode == 0)
494
495 store_tarball = copy_to_nix_store(v, output_filename)
496
497 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
498 with open(cache_file, 'w', encoding='utf-8') as f:
499 f.write(store_tarball)
500 return store_tarball # type: ignore # (for old mypy)
501
502
503 def check_channel_metadata(
504 v: Verification,
505 pin: GitPin,
506 channel_contents: str) -> None:
507 v.status('Verifying git commit in channel tarball')
508 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
509 encoding='utf-8') as f:
510 v.result(f.read(999) == pin.git_revision)
511
512 v.status(
513 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
514 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
515 encoding='utf-8') as f:
516 version_suffix = f.read(999)
517 v.status(version_suffix)
518 v.result(pin.release_name.endswith(version_suffix))
519
520
521 def check_channel_contents(
522 v: Verification,
523 channel: TarrableSearchPath,
524 table: Dict[str, ChannelTableEntry],
525 pin: GitPin) -> None:
526 with tempfile.TemporaryDirectory() as channel_contents, \
527 tempfile.TemporaryDirectory() as git_contents:
528
529 extract_tarball(v, table, channel_contents)
530 check_channel_metadata(v, pin, channel_contents)
531
532 git_checkout(v, channel, pin, git_contents)
533
534 compare_tarball_and_git(v, pin, channel_contents, git_contents)
535
536 v.status('Removing temporary directories')
537 v.ok()
538
539
540 def git_revision_name(
541 v: Verification,
542 channel: TarrableSearchPath,
543 git_revision: str) -> str:
544 v.status('Getting commit date')
545 process = subprocess.run(['git',
546 '-C',
547 git_cache.git_cachedir(channel.git_repo),
548 'log',
549 '-n1',
550 '--format=%ct-%h',
551 '--abbrev=11',
552 '--no-show-signature',
553 git_revision],
554 stdout=subprocess.PIPE)
555 v.result(process.returncode == 0 and process.stdout != b'')
556 return f'{os.path.basename(channel.git_repo)}-{process.stdout.decode().strip()}'
557
558
559 K = TypeVar('K')
560 V = TypeVar('V')
561
562
563 def partition_dict(pred: Callable[[K, V], bool],
564 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
565 selected: Dict[K, V] = {}
566 remaining: Dict[K, V] = {}
567 for k, v in d.items():
568 if pred(k, v):
569 selected[k] = v
570 else:
571 remaining[k] = v
572 return selected, remaining
573
574
575 def filter_dict(d: Dict[K, V], fields: Set[K]
576 ) -> Tuple[Dict[K, V], Dict[K, V]]:
577 return partition_dict(lambda k, v: k in fields, d)
578
579
580 def read_config_section(
581 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
582 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
583 'alias': (AliasSearchPath, AliasPin),
584 'channel': (ChannelSearchPath, ChannelPin),
585 'git': (GitSearchPath, GitPin),
586 'symlink': (SymlinkSearchPath, SymlinkPin),
587 }
588 SP, P = mapping[conf['type']]
589 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
590 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
591 # Error suppression works around https://github.com/python/mypy/issues/9007
592 pin_present = pin_fields or P._fields == ()
593 pin = P(**pin_fields) if pin_present else None # type: ignore
594 return SP(**remaining_fields), pin
595
596
597 def read_pinned_config_section(
598 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
599 sp, pin = read_config_section(conf)
600 if pin is None:
601 raise RuntimeError(
602 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
603 return sp, pin
604
605
606 def read_config(filename: str) -> configparser.ConfigParser:
607 config = configparser.ConfigParser()
608 with open(filename, encoding='utf-8') as f:
609 config.read_file(f, filename)
610 return config
611
612
613 def read_config_files(
614 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
615 merged_config: Dict[str, configparser.SectionProxy] = {}
616 for file in filenames:
617 config = read_config(file)
618 for section in config.sections():
619 if section in merged_config:
620 raise RuntimeError('Duplicate channel "{section}"')
621 merged_config[section] = config[section]
622 return merged_config
623
624
625 def pinCommand(args: argparse.Namespace) -> None:
626 v = Verification()
627 config = read_config(args.channels_file)
628 for section in config.sections():
629 if args.channels and section not in args.channels:
630 continue
631
632 sp, old_pin = read_config_section(config[section])
633
634 config[section].update(sp.pin(v, old_pin)._asdict())
635
636 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
637 config.write(configfile)
638
639
640 def updateCommand(args: argparse.Namespace) -> None:
641 v = Verification()
642 exprs: Dict[str, str] = {}
643 profile_manifest = os.path.join(args.profile, "manifest.nix")
644 search_paths: List[str] = [
645 "-I", "pinch_profile=" + args.profile,
646 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
647 ] if os.path.exists(profile_manifest) else []
648 config = {
649 section: read_pinned_config_section(section, conf) for section,
650 conf in read_config_files(
651 args.channels_file).items()}
652 alias, nonalias = partition_dict(
653 lambda k, v: isinstance(v[0], AliasSearchPath), config)
654
655 for section, (sp, pin) in sorted(nonalias.items()):
656 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
657 assert not isinstance(pin, AliasPin) # partition_dict()
658 tarball = sp.fetch(v, pin)
659 search_paths.extend(
660 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
661 exprs[section] = (
662 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
663 f'src = builtins.storePath "{tarball}"; }}')
664
665 for section, (sp, pin) in alias.items():
666 assert isinstance(sp, AliasSearchPath) # For mypy
667 exprs[section] = exprs[sp.alias_of]
668
669 command = [
670 'nix-env',
671 '--profile',
672 args.profile,
673 '--show-trace',
674 '--file',
675 '<nix/unpack-channel.nix>',
676 '--install',
677 '--remove-all',
678 ] + search_paths + ['--from-expression'] + [
679 exprs[name] % name for name in sorted(exprs.keys())]
680 if args.dry_run:
681 print(' '.join(map(shlex.quote, command)))
682 else:
683 v.status('Installing channels with nix-env')
684 process = subprocess.run(command)
685 v.result(process.returncode == 0)
686
687
688 def main() -> None:
689 parser = argparse.ArgumentParser(prog='pinch')
690 subparsers = parser.add_subparsers(dest='mode', required=True)
691 parser_pin = subparsers.add_parser('pin')
692 parser_pin.add_argument('channels_file', type=str)
693 parser_pin.add_argument('channels', type=str, nargs='*')
694 parser_pin.set_defaults(func=pinCommand)
695 parser_update = subparsers.add_parser('update')
696 parser_update.add_argument('--dry-run', action='store_true')
697 parser_update.add_argument('--profile', default=(
698 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
699 parser_update.add_argument('channels_file', type=str, nargs='+')
700 parser_update.set_defaults(func=updateCommand)
701 args = parser.parse_args()
702 args.func(args)
703
704
705 if __name__ == '__main__':
706 main()