]> git.scottworley.com Git - pinch/blob - pinch.py
Remove unused to_Digest32()
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Finding release name (1)')
270 title = d.getElementsByTagName('title')[0].firstChild
271 v.result(isinstance(title, xml.dom.minidom.CharacterData))
272 assert isinstance(title, xml.dom.minidom.CharacterData)
273 release_name = title.nodeValue.split()[2]
274 v.status('Finding release name (2)')
275 h1 = d.getElementsByTagName('h1')[0].firstChild
276 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
277 assert isinstance(h1, xml.dom.minidom.CharacterData)
278 v.status('Verifying release name:')
279 v.status(release_name)
280 v.result(release_name == h1.nodeValue.split()[2])
281
282 v.status('Finding git commit')
283 git_commit_node = d.getElementsByTagName('tt')[0]
284 v.result(
285 isinstance(
286 git_commit_node.firstChild,
287 xml.dom.minidom.CharacterData))
288 assert isinstance(
289 git_commit_node.firstChild,
290 xml.dom.minidom.CharacterData)
291 v.status('Extracting git commit:')
292 git_revision = git_commit_node.firstChild.nodeValue
293 v.status(git_revision)
294 v.ok()
295 v.status('Verifying git commit label')
296 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
297
298 v.status('Parsing table')
299 table: Dict[str, ChannelTableEntry] = {}
300 for row in d.getElementsByTagName('tr')[1:]:
301 name = row.childNodes[0].firstChild.firstChild.nodeValue
302 url = row.childNodes[0].firstChild.getAttribute('href')
303 size = int(row.childNodes[1].firstChild.nodeValue)
304 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
305 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
306 v.ok()
307 return table, GitPin(release_name=release_name, git_revision=git_revision)
308
309
310 def digest_string(s: bytes) -> Digest16:
311 return Digest16(hashlib.sha256(s).hexdigest())
312
313
314 def digest_file(filename: str) -> Digest16:
315 hasher = hashlib.sha256()
316 with open(filename, 'rb') as f:
317 # pylint: disable=cell-var-from-loop
318 for block in iter(lambda: f.read(4096), b''):
319 hasher.update(block)
320 return Digest16(hasher.hexdigest())
321
322
323 _NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
324
325
326 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
327 v.status('Converting digest to base16')
328 process = subprocess.run(_NIX_COMMAND + [
329 'to-base16',
330 '--type',
331 'sha256',
332 digest32],
333 stdout=subprocess.PIPE)
334 v.result(process.returncode == 0)
335 return Digest16(process.stdout.decode().strip())
336
337
338 def fetch_with_nix_prefetch_url(
339 v: Verification,
340 url: str,
341 digest: Digest16) -> str:
342 v.status(f'Fetching {url}')
343 process = subprocess.run(
344 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
345 v.result(process.returncode == 0)
346 prefetch_digest, path, empty = process.stdout.decode().split('\n')
347 assert empty == ''
348 v.check("Verifying nix-prefetch-url's digest",
349 to_Digest16(v, Digest32(prefetch_digest)) == digest)
350 v.status(f"Verifying digest of {path}")
351 file_digest = digest_file(path)
352 v.result(file_digest == digest)
353 return path # type: ignore # (for old mypy)
354
355
356 def fetch_resources(
357 v: Verification,
358 pin: GitPin,
359 forwarded_url: str,
360 table: Dict[str, ChannelTableEntry]) -> None:
361 for resource in ['git-revision', 'nixexprs.tar.xz']:
362 fields = table[resource]
363 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
364 fields.file = fetch_with_nix_prefetch_url(
365 v, fields.absolute_url, fields.digest)
366 v.status('Verifying git commit on main page matches git commit in table')
367 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
368 v.result(rev_file.read(999) == pin.git_revision)
369
370
371 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
372 return os.path.join(
373 xdg.XDG_CACHE_HOME,
374 'pinch/git-tarball',
375 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
376
377
378 def verify_git_ancestry(
379 v: Verification,
380 channel: TarrableSearchPath,
381 old_revision: str,
382 new_revision: str) -> None:
383 cachedir = git_cache.git_cachedir(channel.git_repo)
384 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
385 process = subprocess.run(['git',
386 '-C',
387 cachedir,
388 'merge-base',
389 '--is-ancestor',
390 old_revision,
391 new_revision])
392 v.result(process.returncode == 0)
393
394
395 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
396 a = os.path.join(root1, path)
397 b = os.path.join(root2, path)
398 return (os.path.islink(a)
399 and os.path.islink(b)
400 and not os.path.exists(a)
401 and not os.path.exists(b)
402 and os.readlink(a) == os.readlink(b))
403
404
405 def compare_tarball_and_git(
406 v: Verification,
407 pin: GitPin,
408 channel_contents: str,
409 git_contents: str) -> None:
410 v.status('Comparing channel tarball with git checkout')
411 tarball_contents = os.path.join(channel_contents, pin.release_name)
412 match, mismatch, errors = compare(tarball_contents, git_contents)
413 v.ok()
414 v.check(f'{len(match)} files match', len(match) > 0)
415 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
416 expected_errors = [
417 '.git-revision',
418 '.version-suffix',
419 'nixpkgs',
420 'programs.sqlite',
421 'svn-revision']
422 benign_expected_errors = []
423 for ee in expected_errors:
424 if ee in errors:
425 errors.remove(ee)
426 benign_expected_errors.append(ee)
427 errors = [
428 e for e in errors
429 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
430 ]
431 v.check(
432 f'{len(errors)} unexpected incomparable files: {errors}',
433 len(errors) == 0)
434 v.check(
435 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
436 len(benign_expected_errors) == len(expected_errors))
437
438
439 def extract_tarball(
440 v: Verification,
441 table: Dict[str, ChannelTableEntry],
442 dest: str) -> None:
443 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
444 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
445 v.ok()
446
447
448 def git_checkout(
449 v: Verification,
450 channel: TarrableSearchPath,
451 pin: GitPin,
452 dest: str) -> None:
453 v.status('Checking out corresponding git revision')
454 with subprocess.Popen(
455 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
456 stdout=subprocess.PIPE) as git:
457 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
458 if git.stdout:
459 git.stdout.close()
460 tar.wait()
461 git.wait()
462 v.result(git.returncode == 0 and tar.returncode == 0)
463
464
465 def git_get_tarball(
466 v: Verification,
467 channel: TarrableSearchPath,
468 pin: GitPin) -> str:
469 cache_file = tarball_cache_file(channel, pin)
470 if os.path.exists(cache_file):
471 with open(cache_file, encoding='utf-8') as f:
472 cached_tarball = f.read(9999)
473 if os.path.exists(cached_tarball):
474 return cached_tarball
475
476 with tempfile.TemporaryDirectory() as output_dir:
477 output_filename = os.path.join(
478 output_dir, pin.release_name + '.tar.xz')
479 with open(output_filename, 'w', encoding='utf-8') as output_file:
480 v.status(f'Generating tarball for git revision {pin.git_revision}')
481 with subprocess.Popen(
482 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
483 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
484 stdout=subprocess.PIPE) as git:
485 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
486 xz.wait()
487 git.wait()
488 v.result(git.returncode == 0 and xz.returncode == 0)
489
490 store_tarball = copy_to_nix_store(v, output_filename)
491
492 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
493 with open(cache_file, 'w', encoding='utf-8') as f:
494 f.write(store_tarball)
495 return store_tarball # type: ignore # (for old mypy)
496
497
498 def check_channel_metadata(
499 v: Verification,
500 pin: GitPin,
501 channel_contents: str) -> None:
502 v.status('Verifying git commit in channel tarball')
503 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
504 encoding='utf-8') as f:
505 v.result(f.read(999) == pin.git_revision)
506
507 v.status(
508 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
509 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
510 encoding='utf-8') as f:
511 version_suffix = f.read(999)
512 v.status(version_suffix)
513 v.result(pin.release_name.endswith(version_suffix))
514
515
516 def check_channel_contents(
517 v: Verification,
518 channel: TarrableSearchPath,
519 table: Dict[str, ChannelTableEntry],
520 pin: GitPin) -> None:
521 with tempfile.TemporaryDirectory() as channel_contents, \
522 tempfile.TemporaryDirectory() as git_contents:
523
524 extract_tarball(v, table, channel_contents)
525 check_channel_metadata(v, pin, channel_contents)
526
527 git_checkout(v, channel, pin, git_contents)
528
529 compare_tarball_and_git(v, pin, channel_contents, git_contents)
530
531 v.status('Removing temporary directories')
532 v.ok()
533
534
535 def git_revision_name(
536 v: Verification,
537 channel: TarrableSearchPath,
538 git_revision: str) -> str:
539 v.status('Getting commit date')
540 process = subprocess.run(['git',
541 '-C',
542 git_cache.git_cachedir(channel.git_repo),
543 'log',
544 '-n1',
545 '--format=%ct-%h',
546 '--abbrev=11',
547 '--no-show-signature',
548 git_revision],
549 stdout=subprocess.PIPE)
550 v.result(process.returncode == 0 and process.stdout != b'')
551 return f'{
552 os.path.basename(channel.git_repo)}-{
553 process.stdout.decode().strip()}'
554
555
556 K = TypeVar('K')
557 V = TypeVar('V')
558
559
560 def partition_dict(pred: Callable[[K, V], bool],
561 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
562 selected: Dict[K, V] = {}
563 remaining: Dict[K, V] = {}
564 for k, v in d.items():
565 if pred(k, v):
566 selected[k] = v
567 else:
568 remaining[k] = v
569 return selected, remaining
570
571
572 def filter_dict(d: Dict[K, V], fields: Set[K]
573 ) -> Tuple[Dict[K, V], Dict[K, V]]:
574 return partition_dict(lambda k, v: k in fields, d)
575
576
577 def read_config_section(
578 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
579 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
580 'alias': (AliasSearchPath, AliasPin),
581 'channel': (ChannelSearchPath, ChannelPin),
582 'git': (GitSearchPath, GitPin),
583 'symlink': (SymlinkSearchPath, SymlinkPin),
584 }
585 SP, P = mapping[conf['type']]
586 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
587 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
588 # Error suppression works around https://github.com/python/mypy/issues/9007
589 pin_present = pin_fields or P._fields == ()
590 pin = P(**pin_fields) if pin_present else None # type: ignore
591 return SP(**remaining_fields), pin
592
593
594 def read_pinned_config_section(
595 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
596 sp, pin = read_config_section(conf)
597 if pin is None:
598 raise RuntimeError(
599 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
600 return sp, pin
601
602
603 def read_config(filename: str) -> configparser.ConfigParser:
604 config = configparser.ConfigParser()
605 with open(filename, encoding='utf-8') as f:
606 config.read_file(f, filename)
607 return config
608
609
610 def read_config_files(
611 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
612 merged_config: Dict[str, configparser.SectionProxy] = {}
613 for file in filenames:
614 config = read_config(file)
615 for section in config.sections():
616 if section in merged_config:
617 raise RuntimeError('Duplicate channel "{section}"')
618 merged_config[section] = config[section]
619 return merged_config
620
621
622 def pinCommand(args: argparse.Namespace) -> None:
623 v = Verification()
624 config = read_config(args.channels_file)
625 for section in config.sections():
626 if args.channels and section not in args.channels:
627 continue
628
629 sp, old_pin = read_config_section(config[section])
630
631 config[section].update(sp.pin(v, old_pin)._asdict())
632
633 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
634 config.write(configfile)
635
636
637 def updateCommand(args: argparse.Namespace) -> None:
638 v = Verification()
639 exprs: Dict[str, str] = {}
640 profile_manifest = os.path.join(args.profile, "manifest.nix")
641 search_paths: List[str] = [
642 "-I", "pinch_profile=" + args.profile,
643 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
644 ] if os.path.exists(profile_manifest) else []
645 config = {
646 section: read_pinned_config_section(section, conf) for section,
647 conf in read_config_files(
648 args.channels_file).items()}
649 alias, nonalias = partition_dict(
650 lambda k, v: isinstance(v[0], AliasSearchPath), config)
651
652 for section, (sp, pin) in sorted(nonalias.items()):
653 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
654 assert not isinstance(pin, AliasPin) # partition_dict()
655 tarball = sp.fetch(v, pin)
656 search_paths.extend(
657 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
658 exprs[section] = (
659 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
660 f'src = builtins.storePath "{tarball}"; }}')
661
662 for section, (sp, pin) in alias.items():
663 assert isinstance(sp, AliasSearchPath) # For mypy
664 exprs[section] = exprs[sp.alias_of]
665
666 with tempfile.NamedTemporaryFile() as unpack_channel_nix:
667 unpack_channel_nix.write(b'''
668 { name, channelName, src, }:
669 derivation {
670 inherit name channelName src;
671 builder = "builtin:unpack-channel";
672 system = "builtin";
673 preferLocalBuild = true;
674 }
675 ''')
676 unpack_channel_nix.flush()
677
678 command = [
679 'nix-env',
680 '--profile',
681 args.profile,
682 '--show-trace',
683 '--file',
684 unpack_channel_nix.name,
685 '--install',
686 '--remove-all',
687 ] + search_paths + ['--from-expression'] + [
688 exprs[name] % name for name in sorted(exprs.keys())]
689 if args.dry_run:
690 print(' '.join(map(shlex.quote, command)))
691 else:
692 v.status('Installing channels with nix-env')
693 process = subprocess.run(command)
694 v.result(process.returncode == 0)
695
696
697 def main() -> None:
698 parser = argparse.ArgumentParser(prog='pinch')
699 subparsers = parser.add_subparsers(dest='mode', required=True)
700 parser_pin = subparsers.add_parser('pin')
701 parser_pin.add_argument('channels_file', type=str)
702 parser_pin.add_argument('channels', type=str, nargs='*')
703 parser_pin.set_defaults(func=pinCommand)
704 parser_update = subparsers.add_parser('update')
705 parser_update.add_argument('--dry-run', action='store_true')
706 parser_update.add_argument('--profile', default=(
707 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
708 parser_update.add_argument('channels_file', type=str, nargs='+')
709 parser_update.set_defaults(func=updateCommand)
710 args = parser.parse_args()
711 args.func(args)
712
713
714 if __name__ == '__main__':
715 main()