]> git.scottworley.com Git - pinch/blob - pinch.py
Factor out symlink_archive()
[pinch] / pinch.py
1 import argparse
2 import configparser
3 import filecmp
4 import functools
5 import getpass
6 import hashlib
7 import operator
8 import os
9 import os.path
10 import shlex
11 import shutil
12 import subprocess
13 import sys
14 import tarfile
15 import tempfile
16 import types
17 import urllib.parse
18 import urllib.request
19 import xml.dom.minidom
20
21 from typing import (
22 Callable,
23 Dict,
24 Iterable,
25 List,
26 Mapping,
27 NamedTuple,
28 NewType,
29 Optional,
30 Set,
31 Tuple,
32 Type,
33 TypeVar,
34 Union,
35 )
36
37 # Use xdg module when it's less painful to have as a dependency
38
39
40 class XDG(NamedTuple):
41 XDG_CACHE_HOME: str
42
43
44 xdg = XDG(
45 XDG_CACHE_HOME=os.getenv(
46 'XDG_CACHE_HOME',
47 os.path.expanduser('~/.cache')))
48
49
50 class VerificationError(Exception):
51 pass
52
53
54 class Verification:
55
56 def __init__(self) -> None:
57 self.line_length = 0
58
59 def status(self, s: str) -> None:
60 print(s, end=' ', file=sys.stderr, flush=True)
61 self.line_length += 1 + len(s) # Unicode??
62
63 @staticmethod
64 def _color(s: str, c: int) -> str:
65 return '\033[%2dm%s\033[00m' % (c, s)
66
67 def result(self, r: bool) -> None:
68 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
69 length = len(message)
70 cols = shutil.get_terminal_size().columns or 80
71 pad = (cols - (self.line_length + length)) % cols
72 print(' ' * pad + self._color(message, color), file=sys.stderr)
73 self.line_length = 0
74 if not r:
75 raise VerificationError()
76
77 def check(self, s: str, r: bool) -> None:
78 self.status(s)
79 self.result(r)
80
81 def ok(self) -> None:
82 self.result(True)
83
84
85 Digest16 = NewType('Digest16', str)
86 Digest32 = NewType('Digest32', str)
87
88
89 class ChannelTableEntry(types.SimpleNamespace):
90 absolute_url: str
91 digest: Digest16
92 file: str
93 size: int
94 url: str
95
96
97 class AliasPin(NamedTuple):
98 pass
99
100
101 class SymlinkPin(NamedTuple):
102 @property
103 def release_name(self) -> str:
104 return 'link'
105
106
107 class GitPin(NamedTuple):
108 git_revision: str
109 release_name: str
110
111
112 class ChannelPin(NamedTuple):
113 git_revision: str
114 release_name: str
115 tarball_url: str
116 tarball_sha256: str
117
118
119 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
120
121
122 def copy_to_nix_store(v: Verification, filename: str) -> str:
123 v.status('Putting tarball in Nix store')
124 process = subprocess.run(
125 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
126 v.result(process.returncode == 0)
127 return process.stdout.decode().strip() # type: ignore # (for old mypy)
128
129
130 def symlink_archive(v: Verification, path: str) -> str:
131 with tempfile.TemporaryDirectory() as td:
132 archive_filename = os.path.join(td, 'link.tar.gz')
133 os.symlink(path, os.path.join(td, 'link'))
134 with tarfile.open(archive_filename, mode='x:gz') as t:
135 t.add(os.path.join(td, 'link'), arcname='link')
136 return copy_to_nix_store(v, archive_filename)
137
138
139 class AliasSearchPath(NamedTuple):
140 alias_of: str
141
142 # pylint: disable=no-self-use
143 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
144 return AliasPin()
145
146
147 class SymlinkSearchPath(NamedTuple):
148 path: str
149
150 # pylint: disable=no-self-use
151 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
152 return SymlinkPin()
153
154 def fetch(self, v: Verification, _: Pin) -> str:
155 return symlink_archive(v, self.path)
156
157
158 class GitSearchPath(NamedTuple):
159 git_ref: str
160 git_repo: str
161
162 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
163 if old_pin is not None:
164 assert isinstance(old_pin, GitPin)
165 old_revision = old_pin.git_revision if old_pin is not None else None
166
167 new_revision = git_fetch(v, self, None, old_revision)
168 return GitPin(release_name=git_revision_name(v, self, new_revision),
169 git_revision=new_revision)
170
171 def fetch(self, v: Verification, pin: Pin) -> str:
172 assert isinstance(pin, GitPin)
173 ensure_git_rev_available(v, self, pin, None)
174 return git_get_tarball(v, self, pin)
175
176
177 class ChannelSearchPath(NamedTuple):
178 channel_url: str
179 git_ref: str
180 git_repo: str
181
182 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
183 if old_pin is not None:
184 assert isinstance(old_pin, ChannelPin)
185 old_revision = old_pin.git_revision if old_pin is not None else None
186
187 channel_html, forwarded_url = fetch_channel(v, self)
188 table, new_gitpin = parse_channel(v, channel_html)
189 fetch_resources(v, new_gitpin, forwarded_url, table)
190 ensure_git_rev_available(v, self, new_gitpin, old_revision)
191 check_channel_contents(v, self, table, new_gitpin)
192 return ChannelPin(
193 release_name=new_gitpin.release_name,
194 tarball_url=table['nixexprs.tar.xz'].absolute_url,
195 tarball_sha256=table['nixexprs.tar.xz'].digest,
196 git_revision=new_gitpin.git_revision)
197
198 # pylint: disable=no-self-use
199 def fetch(self, v: Verification, pin: Pin) -> str:
200 assert isinstance(pin, ChannelPin)
201
202 return fetch_with_nix_prefetch_url(
203 v, pin.tarball_url, Digest16(pin.tarball_sha256))
204
205
206 SearchPath = Union[AliasSearchPath,
207 SymlinkSearchPath,
208 GitSearchPath,
209 ChannelSearchPath]
210 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
211
212
213 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
214
215 def throw(error: OSError) -> None:
216 raise error
217
218 def join(x: str, y: str) -> str:
219 return y if x == '.' else os.path.join(x, y)
220
221 def recursive_files(d: str) -> Iterable[str]:
222 all_files: List[str] = []
223 for path, dirs, files in os.walk(d, onerror=throw):
224 rel = os.path.relpath(path, start=d)
225 all_files.extend(join(rel, f) for f in files)
226 for dir_or_link in dirs:
227 if os.path.islink(join(path, dir_or_link)):
228 all_files.append(join(rel, dir_or_link))
229 return all_files
230
231 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
232 return (f for f in files if not f.startswith('.git/'))
233
234 files = functools.reduce(
235 operator.or_, (set(
236 exclude_dot_git(
237 recursive_files(x))) for x in [a, b]))
238 return filecmp.cmpfiles(a, b, files, shallow=False)
239
240
241 def fetch_channel(
242 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
243 v.status('Fetching channel')
244 request = urllib.request.urlopen(channel.channel_url, timeout=10)
245 channel_html = request.read().decode()
246 forwarded_url = request.geturl()
247 v.result(request.status == 200) # type: ignore # (for old mypy)
248 v.check('Got forwarded', channel.channel_url != forwarded_url)
249 return channel_html, forwarded_url
250
251
252 def parse_channel(v: Verification, channel_html: str) \
253 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
254 v.status('Parsing channel description as XML')
255 d = xml.dom.minidom.parseString(channel_html)
256 v.ok()
257
258 v.status('Extracting release name:')
259 title_name = d.getElementsByTagName(
260 'title')[0].firstChild.nodeValue.split()[2]
261 h1_name = d.getElementsByTagName('h1')[0].firstChild.nodeValue.split()[2]
262 v.status(title_name)
263 v.result(title_name == h1_name)
264
265 v.status('Extracting git commit:')
266 git_commit_node = d.getElementsByTagName('tt')[0]
267 git_revision = git_commit_node.firstChild.nodeValue
268 v.status(git_revision)
269 v.ok()
270 v.status('Verifying git commit label')
271 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
272
273 v.status('Parsing table')
274 table: Dict[str, ChannelTableEntry] = {}
275 for row in d.getElementsByTagName('tr')[1:]:
276 name = row.childNodes[0].firstChild.firstChild.nodeValue
277 url = row.childNodes[0].firstChild.getAttribute('href')
278 size = int(row.childNodes[1].firstChild.nodeValue)
279 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
280 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
281 v.ok()
282 return table, GitPin(release_name=title_name, git_revision=git_revision)
283
284
285 def digest_string(s: bytes) -> Digest16:
286 return Digest16(hashlib.sha256(s).hexdigest())
287
288
289 def digest_file(filename: str) -> Digest16:
290 hasher = hashlib.sha256()
291 with open(filename, 'rb') as f:
292 # pylint: disable=cell-var-from-loop
293 for block in iter(lambda: f.read(4096), b''):
294 hasher.update(block)
295 return Digest16(hasher.hexdigest())
296
297
298 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
299 v.status('Converting digest to base16')
300 process = subprocess.run(
301 ['nix', 'to-base16', '--type', 'sha256', digest32], stdout=subprocess.PIPE)
302 v.result(process.returncode == 0)
303 return Digest16(process.stdout.decode().strip())
304
305
306 def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
307 v.status('Converting digest to base32')
308 process = subprocess.run(
309 ['nix', 'to-base32', '--type', 'sha256', digest16], stdout=subprocess.PIPE)
310 v.result(process.returncode == 0)
311 return Digest32(process.stdout.decode().strip())
312
313
314 def fetch_with_nix_prefetch_url(
315 v: Verification,
316 url: str,
317 digest: Digest16) -> str:
318 v.status('Fetching %s' % url)
319 process = subprocess.run(
320 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
321 v.result(process.returncode == 0)
322 prefetch_digest, path, empty = process.stdout.decode().split('\n')
323 assert empty == ''
324 v.check("Verifying nix-prefetch-url's digest",
325 to_Digest16(v, Digest32(prefetch_digest)) == digest)
326 v.status("Verifying file digest")
327 file_digest = digest_file(path)
328 v.result(file_digest == digest)
329 return path # type: ignore # (for old mypy)
330
331
332 def fetch_resources(
333 v: Verification,
334 pin: GitPin,
335 forwarded_url: str,
336 table: Dict[str, ChannelTableEntry]) -> None:
337 for resource in ['git-revision', 'nixexprs.tar.xz']:
338 fields = table[resource]
339 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
340 fields.file = fetch_with_nix_prefetch_url(
341 v, fields.absolute_url, fields.digest)
342 v.status('Verifying git commit on main page matches git commit in table')
343 v.result(open(table['git-revision'].file).read(999) == pin.git_revision)
344
345
346 def git_cachedir(git_repo: str) -> str:
347 return os.path.join(
348 xdg.XDG_CACHE_HOME,
349 'pinch/git',
350 digest_string(git_repo.encode()))
351
352
353 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
354 return os.path.join(
355 xdg.XDG_CACHE_HOME,
356 'pinch/git-tarball',
357 '%s-%s-%s' %
358 (digest_string(channel.git_repo.encode()),
359 pin.git_revision,
360 pin.release_name))
361
362
363 def verify_git_ancestry(
364 v: Verification,
365 channel: TarrableSearchPath,
366 new_revision: str,
367 old_revision: Optional[str]) -> None:
368 cachedir = git_cachedir(channel.git_repo)
369 v.status('Verifying rev is an ancestor of ref')
370 process = subprocess.run(['git',
371 '-C',
372 cachedir,
373 'merge-base',
374 '--is-ancestor',
375 new_revision,
376 channel.git_ref])
377 v.result(process.returncode == 0)
378
379 if old_revision is not None:
380 v.status(
381 'Verifying rev is an ancestor of previous rev %s' %
382 old_revision)
383 process = subprocess.run(['git',
384 '-C',
385 cachedir,
386 'merge-base',
387 '--is-ancestor',
388 old_revision,
389 new_revision])
390 v.result(process.returncode == 0)
391
392
393 def git_fetch(
394 v: Verification,
395 channel: TarrableSearchPath,
396 desired_revision: Optional[str],
397 old_revision: Optional[str]) -> str:
398 # It would be nice if we could share the nix git cache, but as of the time
399 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
400 # yet), and trying to straddle them both is too far into nix implementation
401 # details for my comfort. So we re-implement here half of nix.fetchGit.
402 # :(
403
404 cachedir = git_cachedir(channel.git_repo)
405 if not os.path.exists(cachedir):
406 v.status("Initializing git repo")
407 process = subprocess.run(
408 ['git', 'init', '--bare', cachedir])
409 v.result(process.returncode == 0)
410
411 v.status('Fetching ref "%s" from %s' % (channel.git_ref, channel.git_repo))
412 # We don't use --force here because we want to abort and freak out if forced
413 # updates are happening.
414 process = subprocess.run(['git',
415 '-C',
416 cachedir,
417 'fetch',
418 channel.git_repo,
419 '%s:%s' % (channel.git_ref,
420 channel.git_ref)])
421 v.result(process.returncode == 0)
422
423 if desired_revision is not None:
424 v.status('Verifying that fetch retrieved this rev')
425 process = subprocess.run(
426 ['git', '-C', cachedir, 'cat-file', '-e', desired_revision])
427 v.result(process.returncode == 0)
428
429 new_revision = open(
430 os.path.join(
431 cachedir,
432 'refs',
433 'heads',
434 channel.git_ref)).read(999).strip()
435
436 verify_git_ancestry(v, channel, new_revision, old_revision)
437
438 return new_revision
439
440
441 def ensure_git_rev_available(
442 v: Verification,
443 channel: TarrableSearchPath,
444 pin: GitPin,
445 old_revision: Optional[str]) -> None:
446 cachedir = git_cachedir(channel.git_repo)
447 if os.path.exists(cachedir):
448 v.status('Checking if we already have this rev:')
449 process = subprocess.run(
450 ['git', '-C', cachedir, 'cat-file', '-e', pin.git_revision])
451 if process.returncode == 0:
452 v.status('yes')
453 if process.returncode == 1:
454 v.status('no')
455 v.result(process.returncode == 0 or process.returncode == 1)
456 if process.returncode == 0:
457 verify_git_ancestry(v, channel, pin.git_revision, old_revision)
458 return
459 git_fetch(v, channel, pin.git_revision, old_revision)
460
461
462 def compare_tarball_and_git(
463 v: Verification,
464 pin: GitPin,
465 channel_contents: str,
466 git_contents: str) -> None:
467 v.status('Comparing channel tarball with git checkout')
468 match, mismatch, errors = compare(os.path.join(
469 channel_contents, pin.release_name), git_contents)
470 v.ok()
471 v.check('%d files match' % len(match), len(match) > 0)
472 v.check('%d files differ' % len(mismatch), len(mismatch) == 0)
473 expected_errors = [
474 '.git-revision',
475 '.version-suffix',
476 'nixpkgs',
477 'programs.sqlite',
478 'svn-revision']
479 benign_errors = []
480 for ee in expected_errors:
481 if ee in errors:
482 errors.remove(ee)
483 benign_errors.append(ee)
484 v.check(
485 '%d unexpected incomparable files' %
486 len(errors),
487 len(errors) == 0)
488 v.check(
489 '(%d of %d expected incomparable files)' %
490 (len(benign_errors),
491 len(expected_errors)),
492 len(benign_errors) == len(expected_errors))
493
494
495 def extract_tarball(
496 v: Verification,
497 table: Dict[str, ChannelTableEntry],
498 dest: str) -> None:
499 v.status('Extracting tarball %s' % table['nixexprs.tar.xz'].file)
500 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
501 v.ok()
502
503
504 def git_checkout(
505 v: Verification,
506 channel: TarrableSearchPath,
507 pin: GitPin,
508 dest: str) -> None:
509 v.status('Checking out corresponding git revision')
510 git = subprocess.Popen(['git',
511 '-C',
512 git_cachedir(channel.git_repo),
513 'archive',
514 pin.git_revision],
515 stdout=subprocess.PIPE)
516 tar = subprocess.Popen(
517 ['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout)
518 if git.stdout:
519 git.stdout.close()
520 tar.wait()
521 git.wait()
522 v.result(git.returncode == 0 and tar.returncode == 0)
523
524
525 def git_get_tarball(
526 v: Verification,
527 channel: TarrableSearchPath,
528 pin: GitPin) -> str:
529 cache_file = tarball_cache_file(channel, pin)
530 if os.path.exists(cache_file):
531 cached_tarball = open(cache_file).read(9999)
532 if os.path.exists(cached_tarball):
533 return cached_tarball
534
535 with tempfile.TemporaryDirectory() as output_dir:
536 output_filename = os.path.join(
537 output_dir, pin.release_name + '.tar.xz')
538 with open(output_filename, 'w') as output_file:
539 v.status(
540 'Generating tarball for git revision %s' %
541 pin.git_revision)
542 git = subprocess.Popen(['git',
543 '-C',
544 git_cachedir(channel.git_repo),
545 'archive',
546 '--prefix=%s/' % pin.release_name,
547 pin.git_revision],
548 stdout=subprocess.PIPE)
549 xz = subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file)
550 xz.wait()
551 git.wait()
552 v.result(git.returncode == 0 and xz.returncode == 0)
553
554 store_tarball = copy_to_nix_store(v, output_filename)
555
556 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
557 open(cache_file, 'w').write(store_tarball)
558 return store_tarball # type: ignore # (for old mypy)
559
560
561 def check_channel_metadata(
562 v: Verification,
563 pin: GitPin,
564 channel_contents: str) -> None:
565 v.status('Verifying git commit in channel tarball')
566 v.result(
567 open(
568 os.path.join(
569 channel_contents,
570 pin.release_name,
571 '.git-revision')).read(999) == pin.git_revision)
572
573 v.status(
574 'Verifying version-suffix is a suffix of release name %s:' %
575 pin.release_name)
576 version_suffix = open(
577 os.path.join(
578 channel_contents,
579 pin.release_name,
580 '.version-suffix')).read(999)
581 v.status(version_suffix)
582 v.result(pin.release_name.endswith(version_suffix))
583
584
585 def check_channel_contents(
586 v: Verification,
587 channel: TarrableSearchPath,
588 table: Dict[str, ChannelTableEntry],
589 pin: GitPin) -> None:
590 with tempfile.TemporaryDirectory() as channel_contents, \
591 tempfile.TemporaryDirectory() as git_contents:
592
593 extract_tarball(v, table, channel_contents)
594 check_channel_metadata(v, pin, channel_contents)
595
596 git_checkout(v, channel, pin, git_contents)
597
598 compare_tarball_and_git(v, pin, channel_contents, git_contents)
599
600 v.status('Removing temporary directories')
601 v.ok()
602
603
604 def git_revision_name(
605 v: Verification,
606 channel: TarrableSearchPath,
607 git_revision: str) -> str:
608 v.status('Getting commit date')
609 process = subprocess.run(['git',
610 '-C',
611 git_cachedir(channel.git_repo),
612 'log',
613 '-n1',
614 '--format=%ct-%h',
615 '--abbrev=11',
616 '--no-show-signature',
617 git_revision],
618 stdout=subprocess.PIPE)
619 v.result(process.returncode == 0 and process.stdout != b'')
620 return '%s-%s' % (os.path.basename(channel.git_repo),
621 process.stdout.decode().strip())
622
623
624 K = TypeVar('K')
625 V = TypeVar('V')
626
627
628 def partition_dict(pred: Callable[[K, V], bool],
629 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
630 selected: Dict[K, V] = {}
631 remaining: Dict[K, V] = {}
632 for k, v in d.items():
633 if pred(k, v):
634 selected[k] = v
635 else:
636 remaining[k] = v
637 return selected, remaining
638
639
640 def filter_dict(d: Dict[K, V], fields: Set[K]
641 ) -> Tuple[Dict[K, V], Dict[K, V]]:
642 return partition_dict(lambda k, v: k in fields, d)
643
644
645 def read_config_section(
646 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
647 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
648 'alias': (AliasSearchPath, AliasPin),
649 'channel': (ChannelSearchPath, ChannelPin),
650 'git': (GitSearchPath, GitPin),
651 'symlink': (SymlinkSearchPath, SymlinkPin),
652 }
653 SP, P = mapping[conf['type']]
654 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
655 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
656 # Error suppression works around https://github.com/python/mypy/issues/9007
657 pin_present = pin_fields != {} or P._fields == ()
658 pin = P(**pin_fields) if pin_present else None # type: ignore
659 return SP(**remaining_fields), pin
660
661
662 def read_pinned_config_section(
663 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
664 sp, pin = read_config_section(conf)
665 if pin is None:
666 raise Exception(
667 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
668 section)
669 return sp, pin
670
671
672 def read_config(filename: str) -> configparser.ConfigParser:
673 config = configparser.ConfigParser()
674 config.read_file(open(filename), filename)
675 return config
676
677
678 def read_config_files(
679 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
680 merged_config: Dict[str, configparser.SectionProxy] = {}
681 for file in filenames:
682 config = read_config(file)
683 for section in config.sections():
684 if section in merged_config:
685 raise Exception('Duplicate channel "%s"' % section)
686 merged_config[section] = config[section]
687 return merged_config
688
689
690 def pinCommand(args: argparse.Namespace) -> None:
691 v = Verification()
692 config = read_config(args.channels_file)
693 for section in config.sections():
694 if args.channels and section not in args.channels:
695 continue
696
697 sp, old_pin = read_config_section(config[section])
698
699 config[section].update(sp.pin(v, old_pin)._asdict())
700
701 with open(args.channels_file, 'w') as configfile:
702 config.write(configfile)
703
704
705 def updateCommand(args: argparse.Namespace) -> None:
706 v = Verification()
707 exprs: Dict[str, str] = {}
708 config = {
709 section: read_pinned_config_section(section, conf) for section,
710 conf in read_config_files(
711 args.channels_file).items()}
712 alias, nonalias = partition_dict(
713 lambda k, v: isinstance(v[0], AliasSearchPath), config)
714
715 for section, (sp, pin) in nonalias.items():
716 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
717 assert not isinstance(pin, AliasPin) # partition_dict()
718 tarball = sp.fetch(v, pin)
719 exprs[section] = (
720 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
721 (pin.release_name, tarball))
722
723 for section, (sp, pin) in alias.items():
724 assert isinstance(sp, AliasSearchPath) # For mypy
725 exprs[section] = exprs[sp.alias_of]
726
727 command = [
728 'nix-env',
729 '--profile',
730 '/nix/var/nix/profiles/per-user/%s/channels' %
731 getpass.getuser(),
732 '--show-trace',
733 '--file',
734 '<nix/unpack-channel.nix>',
735 '--install',
736 '--from-expression'] + [exprs[name] % name for name in sorted(exprs.keys())]
737 if args.dry_run:
738 print(' '.join(map(shlex.quote, command)))
739 else:
740 v.status('Installing channels with nix-env')
741 process = subprocess.run(command)
742 v.result(process.returncode == 0)
743
744
745 def main() -> None:
746 parser = argparse.ArgumentParser(prog='pinch')
747 subparsers = parser.add_subparsers(dest='mode', required=True)
748 parser_pin = subparsers.add_parser('pin')
749 parser_pin.add_argument('channels_file', type=str)
750 parser_pin.add_argument('channels', type=str, nargs='*')
751 parser_pin.set_defaults(func=pinCommand)
752 parser_update = subparsers.add_parser('update')
753 parser_update.add_argument('--dry-run', action='store_true')
754 parser_update.add_argument('channels_file', type=str, nargs='+')
755 parser_update.set_defaults(func=updateCommand)
756 args = parser.parse_args()
757 args.func(args)
758
759
760 if __name__ == '__main__':
761 main()