]> git.scottworley.com Git - pinch/blob - pinch.py
Use new 'nix hash convert' to quiet new nix's depreciation warnings
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Finding release name (1)')
270 title = d.getElementsByTagName('title')[0].firstChild
271 v.result(isinstance(title, xml.dom.minidom.CharacterData))
272 assert isinstance(title, xml.dom.minidom.CharacterData)
273 release_name = title.nodeValue.split()[2]
274 v.status('Finding release name (2)')
275 h1 = d.getElementsByTagName('h1')[0].firstChild
276 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
277 assert isinstance(h1, xml.dom.minidom.CharacterData)
278 v.status('Verifying release name:')
279 v.status(release_name)
280 v.result(release_name == h1.nodeValue.split()[2])
281
282 v.status('Finding git commit')
283 git_commit_node = d.getElementsByTagName('tt')[0]
284 v.result(
285 isinstance(
286 git_commit_node.firstChild,
287 xml.dom.minidom.CharacterData))
288 assert isinstance(
289 git_commit_node.firstChild,
290 xml.dom.minidom.CharacterData)
291 v.status('Extracting git commit:')
292 git_revision = git_commit_node.firstChild.nodeValue
293 v.status(git_revision)
294 v.ok()
295 v.status('Verifying git commit label')
296 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
297
298 v.status('Parsing table')
299 table: Dict[str, ChannelTableEntry] = {}
300 for row in d.getElementsByTagName('tr')[1:]:
301 name = row.childNodes[0].firstChild.firstChild.nodeValue
302 url = row.childNodes[0].firstChild.getAttribute('href')
303 size = int(row.childNodes[1].firstChild.nodeValue)
304 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
305 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
306 v.ok()
307 return table, GitPin(release_name=release_name, git_revision=git_revision)
308
309
310 def digest_string(s: bytes) -> Digest16:
311 return Digest16(hashlib.sha256(s).hexdigest())
312
313
314 def digest_file(filename: str) -> Digest16:
315 hasher = hashlib.sha256()
316 with open(filename, 'rb') as f:
317 # pylint: disable=cell-var-from-loop
318 for block in iter(lambda: f.read(4096), b''):
319 hasher.update(block)
320 return Digest16(hasher.hexdigest())
321
322
323 _NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
324
325
326 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
327 v.status('Converting digest to base16')
328 process = subprocess.run(_NIX_COMMAND + [
329 'hash',
330 'convert',
331 '--hash-algo',
332 'sha256',
333 '--to',
334 'base16',
335 digest32],
336 stdout=subprocess.PIPE)
337 v.result(process.returncode == 0)
338 return Digest16(process.stdout.decode().strip())
339
340
341 def fetch_with_nix_prefetch_url(
342 v: Verification,
343 url: str,
344 digest: Digest16) -> str:
345 v.status(f'Fetching {url}')
346 process = subprocess.run(
347 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
348 v.result(process.returncode == 0)
349 prefetch_digest, path, empty = process.stdout.decode().split('\n')
350 assert empty == ''
351 v.check("Verifying nix-prefetch-url's digest",
352 to_Digest16(v, Digest32(prefetch_digest)) == digest)
353 v.status(f"Verifying digest of {path}")
354 file_digest = digest_file(path)
355 v.result(file_digest == digest)
356 return path # type: ignore # (for old mypy)
357
358
359 def fetch_resources(
360 v: Verification,
361 pin: GitPin,
362 forwarded_url: str,
363 table: Dict[str, ChannelTableEntry]) -> None:
364 for resource in ['git-revision', 'nixexprs.tar.xz']:
365 fields = table[resource]
366 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
367 fields.file = fetch_with_nix_prefetch_url(
368 v, fields.absolute_url, fields.digest)
369 v.status('Verifying git commit on main page matches git commit in table')
370 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
371 v.result(rev_file.read(999) == pin.git_revision)
372
373
374 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
375 return os.path.join(
376 xdg.XDG_CACHE_HOME,
377 'pinch/git-tarball',
378 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
379
380
381 def verify_git_ancestry(
382 v: Verification,
383 channel: TarrableSearchPath,
384 old_revision: str,
385 new_revision: str) -> None:
386 cachedir = git_cache.git_cachedir(channel.git_repo)
387 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
388 process = subprocess.run(['git',
389 '-C',
390 cachedir,
391 'merge-base',
392 '--is-ancestor',
393 old_revision,
394 new_revision])
395 v.result(process.returncode == 0)
396
397
398 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
399 a = os.path.join(root1, path)
400 b = os.path.join(root2, path)
401 return (os.path.islink(a)
402 and os.path.islink(b)
403 and not os.path.exists(a)
404 and not os.path.exists(b)
405 and os.readlink(a) == os.readlink(b))
406
407
408 def compare_tarball_and_git(
409 v: Verification,
410 pin: GitPin,
411 channel_contents: str,
412 git_contents: str) -> None:
413 v.status('Comparing channel tarball with git checkout')
414 tarball_contents = os.path.join(channel_contents, pin.release_name)
415 match, mismatch, errors = compare(tarball_contents, git_contents)
416 v.ok()
417 v.check(f'{len(match)} files match', len(match) > 0)
418 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
419 expected_errors = [
420 '.git-revision',
421 '.version-suffix',
422 'nixpkgs',
423 'programs.sqlite',
424 'svn-revision']
425 benign_expected_errors = []
426 for ee in expected_errors:
427 if ee in errors:
428 errors.remove(ee)
429 benign_expected_errors.append(ee)
430 errors = [
431 e for e in errors
432 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
433 ]
434 v.check(
435 f'{len(errors)} unexpected incomparable files: {errors}',
436 len(errors) == 0)
437 v.check(
438 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
439 len(benign_expected_errors) == len(expected_errors))
440
441
442 def extract_tarball(
443 v: Verification,
444 table: Dict[str, ChannelTableEntry],
445 dest: str) -> None:
446 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
447 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
448 v.ok()
449
450
451 def git_checkout(
452 v: Verification,
453 channel: TarrableSearchPath,
454 pin: GitPin,
455 dest: str) -> None:
456 v.status('Checking out corresponding git revision')
457 with subprocess.Popen(
458 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
459 stdout=subprocess.PIPE) as git:
460 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
461 if git.stdout:
462 git.stdout.close()
463 tar.wait()
464 git.wait()
465 v.result(git.returncode == 0 and tar.returncode == 0)
466
467
468 def git_get_tarball(
469 v: Verification,
470 channel: TarrableSearchPath,
471 pin: GitPin) -> str:
472 cache_file = tarball_cache_file(channel, pin)
473 if os.path.exists(cache_file):
474 with open(cache_file, encoding='utf-8') as f:
475 cached_tarball = f.read(9999)
476 if os.path.exists(cached_tarball):
477 return cached_tarball
478
479 with tempfile.TemporaryDirectory() as output_dir:
480 output_filename = os.path.join(
481 output_dir, pin.release_name + '.tar.xz')
482 with open(output_filename, 'w', encoding='utf-8') as output_file:
483 v.status(f'Generating tarball for git revision {pin.git_revision}')
484 with subprocess.Popen(
485 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
486 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
487 stdout=subprocess.PIPE) as git:
488 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
489 xz.wait()
490 git.wait()
491 v.result(git.returncode == 0 and xz.returncode == 0)
492
493 store_tarball = copy_to_nix_store(v, output_filename)
494
495 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
496 with open(cache_file, 'w', encoding='utf-8') as f:
497 f.write(store_tarball)
498 return store_tarball # type: ignore # (for old mypy)
499
500
501 def check_channel_metadata(
502 v: Verification,
503 pin: GitPin,
504 channel_contents: str) -> None:
505 v.status('Verifying git commit in channel tarball')
506 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
507 encoding='utf-8') as f:
508 v.result(f.read(999) == pin.git_revision)
509
510 v.status(
511 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
512 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
513 encoding='utf-8') as f:
514 version_suffix = f.read(999)
515 v.status(version_suffix)
516 v.result(pin.release_name.endswith(version_suffix))
517
518
519 def check_channel_contents(
520 v: Verification,
521 channel: TarrableSearchPath,
522 table: Dict[str, ChannelTableEntry],
523 pin: GitPin) -> None:
524 with tempfile.TemporaryDirectory() as channel_contents, \
525 tempfile.TemporaryDirectory() as git_contents:
526
527 extract_tarball(v, table, channel_contents)
528 check_channel_metadata(v, pin, channel_contents)
529
530 git_checkout(v, channel, pin, git_contents)
531
532 compare_tarball_and_git(v, pin, channel_contents, git_contents)
533
534 v.status('Removing temporary directories')
535 v.ok()
536
537
538 def git_revision_name(
539 v: Verification,
540 channel: TarrableSearchPath,
541 git_revision: str) -> str:
542 v.status('Getting commit date')
543 process = subprocess.run(['git',
544 '-C',
545 git_cache.git_cachedir(channel.git_repo),
546 'log',
547 '-n1',
548 '--format=%ct-%h',
549 '--abbrev=11',
550 '--no-show-signature',
551 git_revision],
552 stdout=subprocess.PIPE)
553 v.result(process.returncode == 0 and process.stdout != b'')
554 return f'{
555 os.path.basename(channel.git_repo)}-{
556 process.stdout.decode().strip()}'
557
558
559 K = TypeVar('K')
560 V = TypeVar('V')
561
562
563 def partition_dict(pred: Callable[[K, V], bool],
564 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
565 selected: Dict[K, V] = {}
566 remaining: Dict[K, V] = {}
567 for k, v in d.items():
568 if pred(k, v):
569 selected[k] = v
570 else:
571 remaining[k] = v
572 return selected, remaining
573
574
575 def filter_dict(d: Dict[K, V], fields: Set[K]
576 ) -> Tuple[Dict[K, V], Dict[K, V]]:
577 return partition_dict(lambda k, v: k in fields, d)
578
579
580 def read_config_section(
581 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
582 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
583 'alias': (AliasSearchPath, AliasPin),
584 'channel': (ChannelSearchPath, ChannelPin),
585 'git': (GitSearchPath, GitPin),
586 'symlink': (SymlinkSearchPath, SymlinkPin),
587 }
588 SP, P = mapping[conf['type']]
589 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
590 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
591 # Error suppression works around https://github.com/python/mypy/issues/9007
592 pin_present = pin_fields or P._fields == ()
593 pin = P(**pin_fields) if pin_present else None # type: ignore
594 return SP(**remaining_fields), pin
595
596
597 def read_pinned_config_section(
598 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
599 sp, pin = read_config_section(conf)
600 if pin is None:
601 raise RuntimeError(
602 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
603 return sp, pin
604
605
606 def read_config(filename: str) -> configparser.ConfigParser:
607 config = configparser.ConfigParser()
608 with open(filename, encoding='utf-8') as f:
609 config.read_file(f, filename)
610 return config
611
612
613 def read_config_files(
614 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
615 merged_config: Dict[str, configparser.SectionProxy] = {}
616 for file in filenames:
617 config = read_config(file)
618 for section in config.sections():
619 if section in merged_config:
620 raise RuntimeError('Duplicate channel "{section}"')
621 merged_config[section] = config[section]
622 return merged_config
623
624
625 def pinCommand(args: argparse.Namespace) -> None:
626 v = Verification()
627 config = read_config(args.channels_file)
628 for section in config.sections():
629 if args.channels and section not in args.channels:
630 continue
631
632 sp, old_pin = read_config_section(config[section])
633
634 config[section].update(sp.pin(v, old_pin)._asdict())
635
636 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
637 config.write(configfile)
638
639
640 def updateCommand(args: argparse.Namespace) -> None:
641 v = Verification()
642 exprs: Dict[str, str] = {}
643 profile_manifest = os.path.join(args.profile, "manifest.nix")
644 search_paths: List[str] = [
645 "-I", "pinch_profile=" + args.profile,
646 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
647 ] if os.path.exists(profile_manifest) else []
648 config = {
649 section: read_pinned_config_section(section, conf) for section,
650 conf in read_config_files(
651 args.channels_file).items()}
652 alias, nonalias = partition_dict(
653 lambda k, v: isinstance(v[0], AliasSearchPath), config)
654
655 for section, (sp, pin) in sorted(nonalias.items()):
656 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
657 assert not isinstance(pin, AliasPin) # partition_dict()
658 tarball = sp.fetch(v, pin)
659 search_paths.extend(
660 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
661 exprs[section] = (
662 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
663 f'src = builtins.storePath "{tarball}"; }}')
664
665 for section, (sp, pin) in alias.items():
666 assert isinstance(sp, AliasSearchPath) # For mypy
667 exprs[section] = exprs[sp.alias_of]
668
669 with tempfile.NamedTemporaryFile() as unpack_channel_nix:
670 unpack_channel_nix.write(b'''
671 { name, channelName, src, }:
672 derivation {
673 inherit name channelName src;
674 builder = "builtin:unpack-channel";
675 system = "builtin";
676 preferLocalBuild = true;
677 }
678 ''')
679 unpack_channel_nix.flush()
680
681 command = [
682 'nix-env',
683 '--profile',
684 args.profile,
685 '--show-trace',
686 '--file',
687 unpack_channel_nix.name,
688 '--install',
689 '--remove-all',
690 ] + search_paths + ['--from-expression'] + [
691 exprs[name] % name for name in sorted(exprs.keys())]
692 if args.dry_run:
693 print(' '.join(map(shlex.quote, command)))
694 else:
695 v.status('Installing channels with nix-env')
696 process = subprocess.run(command)
697 v.result(process.returncode == 0)
698
699
700 def main() -> None:
701 parser = argparse.ArgumentParser(prog='pinch')
702 subparsers = parser.add_subparsers(dest='mode', required=True)
703 parser_pin = subparsers.add_parser('pin')
704 parser_pin.add_argument('channels_file', type=str)
705 parser_pin.add_argument('channels', type=str, nargs='*')
706 parser_pin.set_defaults(func=pinCommand)
707 parser_update = subparsers.add_parser('update')
708 parser_update.add_argument('--dry-run', action='store_true')
709 parser_update.add_argument('--profile', default=(
710 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
711 parser_update.add_argument('channels_file', type=str, nargs='+')
712 parser_update.set_defaults(func=updateCommand)
713 args = parser.parse_args()
714 args.func(args)
715
716
717 if __name__ == '__main__':
718 main()