]> git.scottworley.com Git - pinch/blob - pinch.py
Release 3.3.2
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 # Use xdg module when it's less painful to have as a dependency
47
48
49 class XDG(NamedTuple):
50 XDG_CACHE_HOME: str
51
52
53 xdg = XDG(
54 XDG_CACHE_HOME=os.getenv(
55 'XDG_CACHE_HOME',
56 os.path.expanduser('~/.cache')))
57
58
59 class VerificationError(Exception):
60 pass
61
62
63 class Verification:
64
65 def __init__(self) -> None:
66 self.line_length = 0
67
68 def status(self, s: str) -> None:
69 print(s, end=' ', file=sys.stderr, flush=True)
70 self.line_length += 1 + len(s) # Unicode??
71
72 @staticmethod
73 def _color(s: str, c: int) -> str:
74 return f'\033[{c:2d}m{s}\033[00m'
75
76 def result(self, r: bool) -> None:
77 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
78 length = len(message)
79 cols = shutil.get_terminal_size().columns or 80
80 pad = (cols - (self.line_length + length)) % cols
81 print(' ' * pad + self._color(message, color), file=sys.stderr)
82 self.line_length = 0
83 if not r:
84 raise VerificationError()
85
86 def check(self, s: str, r: bool) -> None:
87 self.status(s)
88 self.result(r)
89
90 def ok(self) -> None:
91 self.result(True)
92
93
94 Digest16 = NewType('Digest16', str)
95 Digest32 = NewType('Digest32', str)
96
97
98 class ChannelTableEntry(types.SimpleNamespace):
99 absolute_url: str
100 digest: Digest16
101 file: str
102 size: int
103 url: str
104
105
106 class AliasPin(NamedTuple):
107 pass
108
109
110 class SymlinkPin(NamedTuple):
111 @property
112 def release_name(self) -> str:
113 return 'link'
114
115
116 class GitPin(NamedTuple):
117 git_revision: str
118 release_name: str
119
120
121 class ChannelPin(NamedTuple):
122 git_revision: str
123 release_name: str
124 tarball_url: str
125 tarball_sha256: str
126
127
128 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
129
130
131 def copy_to_nix_store(v: Verification, filename: str) -> str:
132 v.status('Putting tarball in Nix store')
133 process = subprocess.run(
134 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
135 v.result(process.returncode == 0)
136 return process.stdout.decode().strip() # type: ignore # (for old mypy)
137
138
139 def symlink_archive(v: Verification, path: str) -> str:
140 with tempfile.TemporaryDirectory() as td:
141 archive_filename = os.path.join(td, 'link.tar.gz')
142 os.symlink(path, os.path.join(td, 'link'))
143 with tarfile.open(archive_filename, mode='x:gz') as t:
144 t.add(os.path.join(td, 'link'), arcname='link')
145 return copy_to_nix_store(v, archive_filename)
146
147
148 class AliasSearchPath(NamedTuple):
149 alias_of: str
150
151 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
152 return AliasPin()
153
154
155 class SymlinkSearchPath(NamedTuple):
156 path: str
157
158 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
159 return SymlinkPin()
160
161 def fetch(self, v: Verification, _: Pin) -> str:
162 return symlink_archive(v, self.path)
163
164
165 class GitSearchPath(NamedTuple):
166 git_ref: str
167 git_repo: str
168
169 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
170 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
171 if old_pin is not None:
172 assert isinstance(old_pin, GitPin)
173 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
174 return GitPin(release_name=git_revision_name(v, self, new_revision),
175 git_revision=new_revision)
176
177 def fetch(self, v: Verification, pin: Pin) -> str:
178 assert isinstance(pin, GitPin)
179 git_cache.ensure_rev_available(
180 self.git_repo, self.git_ref, pin.git_revision)
181 return git_get_tarball(v, self, pin)
182
183
184 class ChannelSearchPath(NamedTuple):
185 channel_url: str
186 git_ref: str
187 git_repo: str
188
189 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
190 if old_pin is not None:
191 assert isinstance(old_pin, ChannelPin)
192
193 channel_html, forwarded_url = fetch_channel(v, self)
194 table, new_gitpin = parse_channel(v, channel_html)
195 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
196 return old_pin
197 fetch_resources(v, new_gitpin, forwarded_url, table)
198 git_cache.ensure_rev_available(
199 self.git_repo, self.git_ref, new_gitpin.git_revision)
200 if old_pin is not None:
201 verify_git_ancestry(
202 v, self, old_pin.git_revision, new_gitpin.git_revision)
203 check_channel_contents(v, self, table, new_gitpin)
204 return ChannelPin(
205 release_name=new_gitpin.release_name,
206 tarball_url=table['nixexprs.tar.xz'].absolute_url,
207 tarball_sha256=table['nixexprs.tar.xz'].digest,
208 git_revision=new_gitpin.git_revision)
209
210 def fetch(self, v: Verification, pin: Pin) -> str:
211 assert isinstance(pin, ChannelPin)
212
213 return fetch_with_nix_prefetch_url(
214 v, pin.tarball_url, Digest16(pin.tarball_sha256))
215
216
217 SearchPath = Union[AliasSearchPath,
218 SymlinkSearchPath,
219 GitSearchPath,
220 ChannelSearchPath]
221 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
222
223
224 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
225
226 def throw(error: OSError) -> None:
227 raise error
228
229 def join(x: str, y: str) -> str:
230 return y if x == '.' else os.path.join(x, y)
231
232 def recursive_files(d: str) -> Iterable[str]:
233 all_files: List[str] = []
234 for path, dirs, files in os.walk(d, onerror=throw):
235 rel = os.path.relpath(path, start=d)
236 all_files.extend(join(rel, f) for f in files)
237 for dir_or_link in dirs:
238 if os.path.islink(join(path, dir_or_link)):
239 all_files.append(join(rel, dir_or_link))
240 return all_files
241
242 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
243 return (f for f in files if not f.startswith('.git/'))
244
245 files = functools.reduce(
246 operator.or_, (set(
247 exclude_dot_git(
248 recursive_files(x))) for x in [a, b]))
249 return filecmp.cmpfiles(a, b, files, shallow=False)
250
251
252 def fetch_channel(
253 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
254 v.status(f'Fetching channel from {channel.channel_url}')
255 with urllib.request.urlopen(channel.channel_url, timeout=10) as request:
256 channel_html = request.read().decode()
257 forwarded_url = request.geturl()
258 v.result(request.status == 200)
259 v.check('Got forwarded', channel.channel_url != forwarded_url)
260 return channel_html, forwarded_url
261
262
263 def parse_channel(v: Verification, channel_html: str) \
264 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
265 v.status('Parsing channel description as XML')
266 d = xml.dom.minidom.parseString(channel_html)
267 v.ok()
268
269 v.status('Finding release name (1)')
270 title = d.getElementsByTagName('title')[0].firstChild
271 v.result(isinstance(title, xml.dom.minidom.CharacterData))
272 assert isinstance(title, xml.dom.minidom.CharacterData)
273 release_name = title.nodeValue.split()[2]
274 v.status('Finding release name (2)')
275 h1 = d.getElementsByTagName('h1')[0].firstChild
276 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
277 assert isinstance(h1, xml.dom.minidom.CharacterData)
278 v.status('Verifying release name:')
279 v.status(release_name)
280 v.result(release_name == h1.nodeValue.split()[2])
281
282 v.status('Finding git commit')
283 git_commit_node = d.getElementsByTagName('tt')[0]
284 v.result(
285 isinstance(
286 git_commit_node.firstChild,
287 xml.dom.minidom.CharacterData))
288 assert isinstance(
289 git_commit_node.firstChild,
290 xml.dom.minidom.CharacterData)
291 v.status('Extracting git commit:')
292 git_revision = git_commit_node.firstChild.nodeValue
293 v.status(git_revision)
294 v.ok()
295 v.status('Verifying git commit label')
296 assert git_commit_node.previousSibling is not None
297 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
298
299 v.status('Parsing table')
300 table: Dict[str, ChannelTableEntry] = {}
301 for row in d.getElementsByTagName('tr')[1:]:
302 assert isinstance(
303 row.childNodes[0].firstChild, xml.dom.minidom.Element)
304 assert isinstance(
305 row.childNodes[0].firstChild.firstChild, xml.dom.minidom.Text)
306 name = row.childNodes[0].firstChild.firstChild.nodeValue
307 assert name is not None
308 url = row.childNodes[0].firstChild.getAttribute('href')
309 assert row.childNodes[1].firstChild is not None
310 assert row.childNodes[1].firstChild.nodeValue is not None
311 size = int(row.childNodes[1].firstChild.nodeValue)
312 assert row.childNodes[2].firstChild is not None
313 assert row.childNodes[2].firstChild.firstChild is not None
314 assert row.childNodes[2].firstChild.firstChild.nodeValue is not None
315 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
316 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
317 v.ok()
318 return table, GitPin(release_name=release_name, git_revision=git_revision)
319
320
321 def digest_string(s: bytes) -> Digest16:
322 return Digest16(hashlib.sha256(s).hexdigest())
323
324
325 def digest_file(filename: str) -> Digest16:
326 hasher = hashlib.sha256()
327 with open(filename, 'rb') as f:
328 # pylint: disable=cell-var-from-loop
329 for block in iter(lambda: f.read(4096), b''):
330 hasher.update(block)
331 return Digest16(hasher.hexdigest())
332
333
334 _NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
335
336
337 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
338 v.status('Converting digest to base16')
339 process = subprocess.run(_NIX_COMMAND + [
340 'hash',
341 'convert',
342 '--hash-algo',
343 'sha256',
344 '--to',
345 'base16',
346 digest32],
347 stdout=subprocess.PIPE)
348 v.result(process.returncode == 0)
349 return Digest16(process.stdout.decode().strip())
350
351
352 def fetch_with_nix_prefetch_url(
353 v: Verification,
354 url: str,
355 digest: Digest16) -> str:
356 v.status(f'Fetching {url}')
357 process = subprocess.run(
358 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
359 v.result(process.returncode == 0)
360 prefetch_digest, path, empty = process.stdout.decode().split('\n')
361 assert empty == ''
362 v.check("Verifying nix-prefetch-url's digest",
363 to_Digest16(v, Digest32(prefetch_digest)) == digest)
364 v.status(f"Verifying digest of {path}")
365 file_digest = digest_file(path)
366 v.result(file_digest == digest)
367 return path # type: ignore # (for old mypy)
368
369
370 def fetch_resources(
371 v: Verification,
372 pin: GitPin,
373 forwarded_url: str,
374 table: Dict[str, ChannelTableEntry]) -> None:
375 for resource in ['git-revision', 'nixexprs.tar.xz']:
376 fields = table[resource]
377 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
378 fields.file = fetch_with_nix_prefetch_url(
379 v, fields.absolute_url, fields.digest)
380 v.status('Verifying git commit on main page matches git commit in table')
381 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
382 v.result(rev_file.read(999) == pin.git_revision)
383
384
385 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
386 return os.path.join(
387 xdg.XDG_CACHE_HOME,
388 'pinch/git-tarball',
389 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
390
391
392 def verify_git_ancestry(
393 v: Verification,
394 channel: TarrableSearchPath,
395 old_revision: str,
396 new_revision: str) -> None:
397 cachedir = git_cache.git_cachedir(channel.git_repo)
398 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
399 process = subprocess.run(['git',
400 '-C',
401 cachedir,
402 'merge-base',
403 '--is-ancestor',
404 old_revision,
405 new_revision])
406 v.result(process.returncode == 0)
407
408
409 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
410 a = os.path.join(root1, path)
411 b = os.path.join(root2, path)
412 return (os.path.islink(a)
413 and os.path.islink(b)
414 and not os.path.exists(a)
415 and not os.path.exists(b)
416 and os.readlink(a) == os.readlink(b))
417
418
419 def compare_tarball_and_git(
420 v: Verification,
421 pin: GitPin,
422 channel_contents: str,
423 git_contents: str) -> None:
424 v.status('Comparing channel tarball with git checkout')
425 tarball_contents = os.path.join(channel_contents, pin.release_name)
426 match, mismatch, errors = compare(tarball_contents, git_contents)
427 v.ok()
428 v.check(f'{len(match)} files match', len(match) > 0)
429 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
430 expected_errors = [
431 '.git-revision',
432 '.version-suffix',
433 'nixpkgs',
434 'programs.sqlite',
435 'svn-revision']
436 benign_expected_errors = []
437 for ee in expected_errors:
438 if ee in errors:
439 errors.remove(ee)
440 benign_expected_errors.append(ee)
441 errors = [
442 e for e in errors
443 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
444 ]
445 v.check(
446 f'{len(errors)} unexpected incomparable files: {errors}',
447 len(errors) == 0)
448 v.check(
449 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
450 len(benign_expected_errors) == len(expected_errors))
451
452
453 def extract_tarball(
454 v: Verification,
455 table: Dict[str, ChannelTableEntry],
456 dest: str) -> None:
457 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
458 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
459 v.ok()
460
461
462 def git_checkout(
463 v: Verification,
464 channel: TarrableSearchPath,
465 pin: GitPin,
466 dest: str) -> None:
467 v.status('Checking out corresponding git revision')
468 with subprocess.Popen(
469 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
470 stdout=subprocess.PIPE) as git:
471 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
472 if git.stdout:
473 git.stdout.close()
474 tar.wait()
475 git.wait()
476 v.result(git.returncode == 0 and tar.returncode == 0)
477
478
479 def git_get_tarball(
480 v: Verification,
481 channel: TarrableSearchPath,
482 pin: GitPin) -> str:
483 cache_file = tarball_cache_file(channel, pin)
484 if os.path.exists(cache_file):
485 with open(cache_file, encoding='utf-8') as f:
486 cached_tarball = f.read(9999)
487 if os.path.exists(cached_tarball):
488 return cached_tarball
489
490 with tempfile.TemporaryDirectory() as output_dir:
491 output_filename = os.path.join(
492 output_dir, pin.release_name + '.tar.xz')
493 with open(output_filename, 'w', encoding='utf-8') as output_file:
494 v.status(f'Generating tarball for git revision {pin.git_revision}')
495 with subprocess.Popen(
496 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
497 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
498 stdout=subprocess.PIPE) as git:
499 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
500 xz.wait()
501 git.wait()
502 v.result(git.returncode == 0 and xz.returncode == 0)
503
504 store_tarball = copy_to_nix_store(v, output_filename)
505
506 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
507 with open(cache_file, 'w', encoding='utf-8') as f:
508 f.write(store_tarball)
509 return store_tarball # type: ignore # (for old mypy)
510
511
512 def check_channel_metadata(
513 v: Verification,
514 pin: GitPin,
515 channel_contents: str) -> None:
516 v.status('Verifying git commit in channel tarball')
517 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
518 encoding='utf-8') as f:
519 v.result(f.read(999) == pin.git_revision)
520
521 v.status(
522 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
523 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
524 encoding='utf-8') as f:
525 version_suffix = f.read(999)
526 v.status(version_suffix)
527 v.result(pin.release_name.endswith(version_suffix))
528
529
530 def check_channel_contents(
531 v: Verification,
532 channel: TarrableSearchPath,
533 table: Dict[str, ChannelTableEntry],
534 pin: GitPin) -> None:
535 with tempfile.TemporaryDirectory() as channel_contents, \
536 tempfile.TemporaryDirectory() as git_contents:
537
538 extract_tarball(v, table, channel_contents)
539 check_channel_metadata(v, pin, channel_contents)
540
541 git_checkout(v, channel, pin, git_contents)
542
543 compare_tarball_and_git(v, pin, channel_contents, git_contents)
544
545 v.status('Removing temporary directories')
546 v.ok()
547
548
549 def git_revision_name(
550 v: Verification,
551 channel: TarrableSearchPath,
552 git_revision: str) -> str:
553 v.status('Getting commit date')
554 process = subprocess.run(['git',
555 '-C',
556 git_cache.git_cachedir(channel.git_repo),
557 'log',
558 '-n1',
559 '--format=%ct-%h',
560 '--abbrev=11',
561 '--no-show-signature',
562 git_revision],
563 stdout=subprocess.PIPE)
564 v.result(process.returncode == 0 and process.stdout != b'')
565 return f'{
566 os.path.basename(channel.git_repo)}-{
567 process.stdout.decode().strip()}'
568
569
570 K = TypeVar('K')
571 V = TypeVar('V')
572
573
574 def partition_dict(pred: Callable[[K, V], bool],
575 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
576 selected: Dict[K, V] = {}
577 remaining: Dict[K, V] = {}
578 for k, v in d.items():
579 if pred(k, v):
580 selected[k] = v
581 else:
582 remaining[k] = v
583 return selected, remaining
584
585
586 def filter_dict(d: Dict[K, V], fields: Set[K]
587 ) -> Tuple[Dict[K, V], Dict[K, V]]:
588 return partition_dict(lambda k, v: k in fields, d)
589
590
591 def read_config_section(
592 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
593 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
594 'alias': (AliasSearchPath, AliasPin),
595 'channel': (ChannelSearchPath, ChannelPin),
596 'git': (GitSearchPath, GitPin),
597 'symlink': (SymlinkSearchPath, SymlinkPin),
598 }
599 SP, P = mapping[conf['type']]
600 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
601 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
602 # Error suppression works around https://github.com/python/mypy/issues/9007
603 pin_present = pin_fields or P._fields == ()
604 pin = P(**pin_fields) if pin_present else None # type: ignore
605 return SP(**remaining_fields), pin
606
607
608 def read_pinned_config_section(
609 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
610 sp, pin = read_config_section(conf)
611 if pin is None:
612 raise RuntimeError(
613 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
614 return sp, pin
615
616
617 def read_config(filename: str) -> configparser.ConfigParser:
618 config = configparser.ConfigParser()
619 with open(filename, encoding='utf-8') as f:
620 config.read_file(f, filename)
621 return config
622
623
624 def read_config_files(
625 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
626 merged_config: Dict[str, configparser.SectionProxy] = {}
627 for file in filenames:
628 config = read_config(file)
629 for section in config.sections():
630 if section in merged_config:
631 raise RuntimeError('Duplicate channel "{section}"')
632 merged_config[section] = config[section]
633 return merged_config
634
635
636 def pinCommand(args: argparse.Namespace) -> None:
637 v = Verification()
638 config = read_config(args.channels_file)
639 for section in config.sections():
640 if args.channels and section not in args.channels:
641 continue
642
643 sp, old_pin = read_config_section(config[section])
644
645 config[section].update(sp.pin(v, old_pin)._asdict())
646
647 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
648 config.write(configfile)
649
650
651 def updateCommand(args: argparse.Namespace) -> None:
652 v = Verification()
653 exprs: Dict[str, str] = {}
654 profile_manifest = os.path.join(args.profile, "manifest.nix")
655 search_paths: List[str] = [
656 "-I", "pinch_profile=" + args.profile,
657 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
658 ] if os.path.exists(profile_manifest) else []
659 config = {
660 section: read_pinned_config_section(section, conf) for section,
661 conf in read_config_files(
662 args.channels_file).items()}
663 alias, nonalias = partition_dict(
664 lambda k, v: isinstance(v[0], AliasSearchPath), config)
665
666 for section, (sp, pin) in sorted(nonalias.items()):
667 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
668 assert not isinstance(pin, AliasPin) # partition_dict()
669 tarball = sp.fetch(v, pin)
670 search_paths.extend(
671 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
672 exprs[section] = (
673 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
674 f'src = builtins.storePath "{tarball}"; }}')
675
676 for section, (sp, pin) in alias.items():
677 assert isinstance(sp, AliasSearchPath) # For mypy
678 exprs[section] = exprs[sp.alias_of]
679
680 with tempfile.NamedTemporaryFile() as unpack_channel_nix:
681 unpack_channel_nix.write(b'''
682 { name, channelName, src, }:
683 derivation {
684 inherit name channelName src;
685 builder = "builtin:unpack-channel";
686 system = "builtin";
687 preferLocalBuild = true;
688 }
689 ''')
690 unpack_channel_nix.flush()
691
692 command = [
693 'nix-env',
694 '--profile',
695 args.profile,
696 '--show-trace',
697 '--file',
698 unpack_channel_nix.name,
699 '--install',
700 '--remove-all',
701 ] + search_paths + ['--from-expression'] + [
702 exprs[name] % name for name in sorted(exprs.keys())]
703 if args.dry_run:
704 print(' '.join(map(shlex.quote, command)))
705 else:
706 v.status('Installing channels with nix-env')
707 process = subprocess.run(command)
708 v.result(process.returncode == 0)
709
710
711 def main() -> None:
712 parser = argparse.ArgumentParser(prog='pinch')
713 subparsers = parser.add_subparsers(dest='mode', required=True)
714 parser_pin = subparsers.add_parser('pin')
715 parser_pin.add_argument('channels_file', type=str)
716 parser_pin.add_argument('channels', type=str, nargs='*')
717 parser_pin.set_defaults(func=pinCommand)
718 parser_update = subparsers.add_parser('update')
719 parser_update.add_argument('--dry-run', action='store_true')
720 parser_update.add_argument('--profile', default=(
721 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
722 parser_update.add_argument('channels_file', type=str, nargs='+')
723 parser_update.set_defaults(func=updateCommand)
724 args = parser.parse_args()
725 args.func(args)
726
727
728 if __name__ == '__main__':
729 main()