]> git.scottworley.com Git - pinch/blob - pinch.py
Use a proper User-Agent when fetching channel info
[pinch] / pinch.py
1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7
8 import argparse
9 import configparser
10 import filecmp
11 import functools
12 import getpass
13 import hashlib
14 import operator
15 import os
16 import os.path
17 import shlex
18 import shutil
19 import subprocess
20 import sys
21 import tarfile
22 import tempfile
23 import types
24 import urllib.parse
25 import urllib.request
26 import xml.dom.minidom
27
28 from typing import (
29 Callable,
30 Dict,
31 Iterable,
32 List,
33 Mapping,
34 NamedTuple,
35 NewType,
36 Optional,
37 Set,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 )
43
44 import git_cache
45
46 from version import pinch_version
47
48 # Use xdg module when it's less painful to have as a dependency
49
50
51 class XDG(NamedTuple):
52 XDG_CACHE_HOME: str
53
54
55 xdg = XDG(
56 XDG_CACHE_HOME=os.getenv(
57 'XDG_CACHE_HOME',
58 os.path.expanduser('~/.cache')))
59
60
61 class VerificationError(Exception):
62 pass
63
64
65 class Verification:
66
67 def __init__(self) -> None:
68 self.line_length = 0
69
70 def status(self, s: str) -> None:
71 print(s, end=' ', file=sys.stderr, flush=True)
72 self.line_length += 1 + len(s) # Unicode??
73
74 @staticmethod
75 def _color(s: str, c: int) -> str:
76 return f'\033[{c:2d}m{s}\033[00m'
77
78 def result(self, r: bool) -> None:
79 message, color = {True: ('OK ', 92), False: ('FAIL', 91)}[r]
80 length = len(message)
81 cols = shutil.get_terminal_size().columns or 80
82 pad = (cols - (self.line_length + length)) % cols
83 print(' ' * pad + self._color(message, color), file=sys.stderr)
84 self.line_length = 0
85 if not r:
86 raise VerificationError()
87
88 def check(self, s: str, r: bool) -> None:
89 self.status(s)
90 self.result(r)
91
92 def ok(self) -> None:
93 self.result(True)
94
95
96 Digest16 = NewType('Digest16', str)
97 Digest32 = NewType('Digest32', str)
98
99
100 class ChannelTableEntry(types.SimpleNamespace):
101 absolute_url: str
102 digest: Digest16
103 file: str
104 size: int
105 url: str
106
107
108 class AliasPin(NamedTuple):
109 pass
110
111
112 class SymlinkPin(NamedTuple):
113 @property
114 def release_name(self) -> str:
115 return 'link'
116
117
118 class GitPin(NamedTuple):
119 git_revision: str
120 release_name: str
121
122
123 class ChannelPin(NamedTuple):
124 git_revision: str
125 release_name: str
126 tarball_url: str
127 tarball_sha256: str
128
129
130 Pin = Union[AliasPin, SymlinkPin, GitPin, ChannelPin]
131
132
133 def copy_to_nix_store(v: Verification, filename: str) -> str:
134 v.status('Putting tarball in Nix store')
135 process = subprocess.run(
136 ['nix-store', '--add', filename], stdout=subprocess.PIPE)
137 v.result(process.returncode == 0)
138 return process.stdout.decode().strip() # type: ignore # (for old mypy)
139
140
141 def symlink_archive(v: Verification, path: str) -> str:
142 with tempfile.TemporaryDirectory() as td:
143 archive_filename = os.path.join(td, 'link.tar.gz')
144 os.symlink(path, os.path.join(td, 'link'))
145 with tarfile.open(archive_filename, mode='x:gz') as t:
146 t.add(os.path.join(td, 'link'), arcname='link')
147 return copy_to_nix_store(v, archive_filename)
148
149
150 class AliasSearchPath(NamedTuple):
151 alias_of: str
152
153 def pin(self, _: Verification, __: Optional[Pin]) -> AliasPin:
154 return AliasPin()
155
156
157 class SymlinkSearchPath(NamedTuple):
158 path: str
159
160 def pin(self, _: Verification, __: Optional[Pin]) -> SymlinkPin:
161 return SymlinkPin()
162
163 def fetch(self, v: Verification, _: Pin) -> str:
164 return symlink_archive(v, self.path)
165
166
167 class GitSearchPath(NamedTuple):
168 git_ref: str
169 git_repo: str
170
171 def pin(self, v: Verification, old_pin: Optional[Pin]) -> GitPin:
172 _, new_revision = git_cache.fetch(self.git_repo, self.git_ref)
173 if old_pin is not None:
174 assert isinstance(old_pin, GitPin)
175 verify_git_ancestry(v, self, old_pin.git_revision, new_revision)
176 return GitPin(release_name=git_revision_name(v, self, new_revision),
177 git_revision=new_revision)
178
179 def fetch(self, v: Verification, pin: Pin) -> str:
180 assert isinstance(pin, GitPin)
181 git_cache.ensure_rev_available(
182 self.git_repo, self.git_ref, pin.git_revision)
183 return git_get_tarball(v, self, pin)
184
185
186 class ChannelSearchPath(NamedTuple):
187 channel_url: str
188 git_ref: str
189 git_repo: str
190
191 def pin(self, v: Verification, old_pin: Optional[Pin]) -> ChannelPin:
192 if old_pin is not None:
193 assert isinstance(old_pin, ChannelPin)
194
195 channel_html, forwarded_url = fetch_channel(v, self)
196 table, new_gitpin = parse_channel(v, channel_html)
197 if old_pin is not None and old_pin.git_revision == new_gitpin.git_revision:
198 return old_pin
199 fetch_resources(v, new_gitpin, forwarded_url, table)
200 git_cache.ensure_rev_available(
201 self.git_repo, self.git_ref, new_gitpin.git_revision)
202 if old_pin is not None:
203 verify_git_ancestry(
204 v, self, old_pin.git_revision, new_gitpin.git_revision)
205 check_channel_contents(v, self, table, new_gitpin)
206 return ChannelPin(
207 release_name=new_gitpin.release_name,
208 tarball_url=table['nixexprs.tar.xz'].absolute_url,
209 tarball_sha256=table['nixexprs.tar.xz'].digest,
210 git_revision=new_gitpin.git_revision)
211
212 def fetch(self, v: Verification, pin: Pin) -> str:
213 assert isinstance(pin, ChannelPin)
214
215 return fetch_with_nix_prefetch_url(
216 v, pin.tarball_url, Digest16(pin.tarball_sha256))
217
218
219 SearchPath = Union[AliasSearchPath,
220 SymlinkSearchPath,
221 GitSearchPath,
222 ChannelSearchPath]
223 TarrableSearchPath = Union[GitSearchPath, ChannelSearchPath]
224
225
226 def compare(a: str, b: str) -> Tuple[List[str], List[str], List[str]]:
227
228 def throw(error: OSError) -> None:
229 raise error
230
231 def join(x: str, y: str) -> str:
232 return y if x == '.' else os.path.join(x, y)
233
234 def recursive_files(d: str) -> Iterable[str]:
235 all_files: List[str] = []
236 for path, dirs, files in os.walk(d, onerror=throw):
237 rel = os.path.relpath(path, start=d)
238 all_files.extend(join(rel, f) for f in files)
239 for dir_or_link in dirs:
240 if os.path.islink(join(path, dir_or_link)):
241 all_files.append(join(rel, dir_or_link))
242 return all_files
243
244 def exclude_dot_git(files: Iterable[str]) -> Iterable[str]:
245 return (f for f in files if not f.startswith('.git/'))
246
247 files = functools.reduce(
248 operator.or_, (set(
249 exclude_dot_git(
250 recursive_files(x))) for x in [a, b]))
251 return filecmp.cmpfiles(a, b, files, shallow=False)
252
253
254 def fetch_channel(
255 v: Verification, channel: ChannelSearchPath) -> Tuple[str, str]:
256 v.status(f'Fetching channel from {channel.channel_url}')
257 with urllib.request.urlopen(
258 urllib.request.Request(
259 url=channel.channel_url,
260 headers={'User-Agent': f'pinch-{pinch_version}'}),
261 timeout=10) as request:
262 channel_html = request.read().decode()
263 forwarded_url = request.geturl()
264 v.result(request.status == 200)
265 v.check('Got forwarded', channel.channel_url != forwarded_url)
266 return channel_html, forwarded_url
267
268
269 def parse_channel(v: Verification, channel_html: str) \
270 -> Tuple[Dict[str, ChannelTableEntry], GitPin]:
271 v.status('Parsing channel description as XML')
272 d = xml.dom.minidom.parseString(channel_html)
273 v.ok()
274
275 v.status('Finding release name (1)')
276 title = d.getElementsByTagName('title')[0].firstChild
277 v.result(isinstance(title, xml.dom.minidom.CharacterData))
278 assert isinstance(title, xml.dom.minidom.CharacterData)
279 release_name = title.nodeValue.split()[2]
280 v.status('Finding release name (2)')
281 h1 = d.getElementsByTagName('h1')[0].firstChild
282 v.result(isinstance(h1, xml.dom.minidom.CharacterData))
283 assert isinstance(h1, xml.dom.minidom.CharacterData)
284 v.status('Verifying release name:')
285 v.status(release_name)
286 v.result(release_name == h1.nodeValue.split()[2])
287
288 v.status('Finding git commit')
289 git_commit_node = d.getElementsByTagName('tt')[0]
290 v.result(
291 isinstance(
292 git_commit_node.firstChild,
293 xml.dom.minidom.CharacterData))
294 assert isinstance(
295 git_commit_node.firstChild,
296 xml.dom.minidom.CharacterData)
297 v.status('Extracting git commit:')
298 git_revision = git_commit_node.firstChild.nodeValue
299 v.status(git_revision)
300 v.ok()
301 v.status('Verifying git commit label')
302 assert git_commit_node.previousSibling is not None
303 v.result(git_commit_node.previousSibling.nodeValue == 'Git commit ')
304
305 v.status('Parsing table')
306 table: Dict[str, ChannelTableEntry] = {}
307 for row in d.getElementsByTagName('tr')[1:]:
308 assert isinstance(
309 row.childNodes[0].firstChild, xml.dom.minidom.Element)
310 assert isinstance(
311 row.childNodes[0].firstChild.firstChild, xml.dom.minidom.Text)
312 name = row.childNodes[0].firstChild.firstChild.nodeValue
313 assert name is not None
314 url = row.childNodes[0].firstChild.getAttribute('href')
315 assert row.childNodes[1].firstChild is not None
316 assert row.childNodes[1].firstChild.nodeValue is not None
317 size = int(row.childNodes[1].firstChild.nodeValue)
318 assert row.childNodes[2].firstChild is not None
319 assert row.childNodes[2].firstChild.firstChild is not None
320 assert row.childNodes[2].firstChild.firstChild.nodeValue is not None
321 digest = Digest16(row.childNodes[2].firstChild.firstChild.nodeValue)
322 table[name] = ChannelTableEntry(url=url, digest=digest, size=size)
323 v.ok()
324 return table, GitPin(release_name=release_name, git_revision=git_revision)
325
326
327 def digest_string(s: bytes) -> Digest16:
328 return Digest16(hashlib.sha256(s).hexdigest())
329
330
331 def digest_file(filename: str) -> Digest16:
332 hasher = hashlib.sha256()
333 with open(filename, 'rb') as f:
334 # pylint: disable=cell-var-from-loop
335 for block in iter(lambda: f.read(4096), b''):
336 hasher.update(block)
337 return Digest16(hasher.hexdigest())
338
339
340 _NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
341
342
343 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
344 v.status('Converting digest to base16')
345 process = subprocess.run(_NIX_COMMAND + [
346 'hash',
347 'convert',
348 '--hash-algo',
349 'sha256',
350 '--to',
351 'base16',
352 digest32],
353 stdout=subprocess.PIPE)
354 v.result(process.returncode == 0)
355 return Digest16(process.stdout.decode().strip())
356
357
358 def fetch_with_nix_prefetch_url(
359 v: Verification,
360 url: str,
361 digest: Digest16) -> str:
362 v.status(f'Fetching {url}')
363 process = subprocess.run(
364 ['nix-prefetch-url', '--print-path', url, digest], stdout=subprocess.PIPE)
365 v.result(process.returncode == 0)
366 prefetch_digest, path, empty = process.stdout.decode().split('\n')
367 assert empty == ''
368 v.check("Verifying nix-prefetch-url's digest",
369 to_Digest16(v, Digest32(prefetch_digest)) == digest)
370 v.status(f"Verifying digest of {path}")
371 file_digest = digest_file(path)
372 v.result(file_digest == digest)
373 return path # type: ignore # (for old mypy)
374
375
376 def fetch_resources(
377 v: Verification,
378 pin: GitPin,
379 forwarded_url: str,
380 table: Dict[str, ChannelTableEntry]) -> None:
381 for resource in ['git-revision', 'nixexprs.tar.xz']:
382 fields = table[resource]
383 fields.absolute_url = urllib.parse.urljoin(forwarded_url, fields.url)
384 fields.file = fetch_with_nix_prefetch_url(
385 v, fields.absolute_url, fields.digest)
386 v.status('Verifying git commit on main page matches git commit in table')
387 with open(table['git-revision'].file, encoding='utf-8') as rev_file:
388 v.result(rev_file.read(999) == pin.git_revision)
389
390
391 def tarball_cache_file(channel: TarrableSearchPath, pin: GitPin) -> str:
392 return os.path.join(
393 xdg.XDG_CACHE_HOME,
394 'pinch/git-tarball',
395 f'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
396
397
398 def verify_git_ancestry(
399 v: Verification,
400 channel: TarrableSearchPath,
401 old_revision: str,
402 new_revision: str) -> None:
403 cachedir = git_cache.git_cachedir(channel.git_repo)
404 v.status(f'Verifying rev is an ancestor of previous rev {old_revision}')
405 process = subprocess.run(['git',
406 '-C',
407 cachedir,
408 'merge-base',
409 '--is-ancestor',
410 old_revision,
411 new_revision])
412 v.result(process.returncode == 0)
413
414
415 def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
416 a = os.path.join(root1, path)
417 b = os.path.join(root2, path)
418 return (os.path.islink(a)
419 and os.path.islink(b)
420 and not os.path.exists(a)
421 and not os.path.exists(b)
422 and os.readlink(a) == os.readlink(b))
423
424
425 def compare_tarball_and_git(
426 v: Verification,
427 pin: GitPin,
428 channel_contents: str,
429 git_contents: str) -> None:
430 v.status('Comparing channel tarball with git checkout')
431 tarball_contents = os.path.join(channel_contents, pin.release_name)
432 match, mismatch, errors = compare(tarball_contents, git_contents)
433 v.ok()
434 v.check(f'{len(match)} files match', len(match) > 0)
435 v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
436 expected_errors = [
437 '.git-revision',
438 '.version-suffix',
439 'nixpkgs',
440 'programs.sqlite',
441 'svn-revision']
442 benign_expected_errors = []
443 for ee in expected_errors:
444 if ee in errors:
445 errors.remove(ee)
446 benign_expected_errors.append(ee)
447 errors = [
448 e for e in errors
449 if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
450 ]
451 v.check(
452 f'{len(errors)} unexpected incomparable files: {errors}',
453 len(errors) == 0)
454 v.check(
455 f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
456 len(benign_expected_errors) == len(expected_errors))
457
458
459 def extract_tarball(
460 v: Verification,
461 table: Dict[str, ChannelTableEntry],
462 dest: str) -> None:
463 v.status(f"Extracting tarball {table['nixexprs.tar.xz'].file}")
464 shutil.unpack_archive(table['nixexprs.tar.xz'].file, dest)
465 v.ok()
466
467
468 def git_checkout(
469 v: Verification,
470 channel: TarrableSearchPath,
471 pin: GitPin,
472 dest: str) -> None:
473 v.status('Checking out corresponding git revision')
474 with subprocess.Popen(
475 ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
476 stdout=subprocess.PIPE) as git:
477 with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
478 if git.stdout:
479 git.stdout.close()
480 tar.wait()
481 git.wait()
482 v.result(git.returncode == 0 and tar.returncode == 0)
483
484
485 def git_get_tarball(
486 v: Verification,
487 channel: TarrableSearchPath,
488 pin: GitPin) -> str:
489 cache_file = tarball_cache_file(channel, pin)
490 if os.path.exists(cache_file):
491 with open(cache_file, encoding='utf-8') as f:
492 cached_tarball = f.read(9999)
493 if os.path.exists(cached_tarball):
494 return cached_tarball
495
496 with tempfile.TemporaryDirectory() as output_dir:
497 output_filename = os.path.join(
498 output_dir, pin.release_name + '.tar.xz')
499 with open(output_filename, 'w', encoding='utf-8') as output_file:
500 v.status(f'Generating tarball for git revision {pin.git_revision}')
501 with subprocess.Popen(
502 ['git', '-C', git_cache.git_cachedir(channel.git_repo),
503 'archive', f'--prefix={pin.release_name}/', pin.git_revision],
504 stdout=subprocess.PIPE) as git:
505 with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
506 xz.wait()
507 git.wait()
508 v.result(git.returncode == 0 and xz.returncode == 0)
509
510 store_tarball = copy_to_nix_store(v, output_filename)
511
512 os.makedirs(os.path.dirname(cache_file), exist_ok=True)
513 with open(cache_file, 'w', encoding='utf-8') as f:
514 f.write(store_tarball)
515 return store_tarball # type: ignore # (for old mypy)
516
517
518 def check_channel_metadata(
519 v: Verification,
520 pin: GitPin,
521 channel_contents: str) -> None:
522 v.status('Verifying git commit in channel tarball')
523 with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
524 encoding='utf-8') as f:
525 v.result(f.read(999) == pin.git_revision)
526
527 v.status(
528 f'Verifying version-suffix is a suffix of release name {pin.release_name}:')
529 with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
530 encoding='utf-8') as f:
531 version_suffix = f.read(999)
532 v.status(version_suffix)
533 v.result(pin.release_name.endswith(version_suffix))
534
535
536 def check_channel_contents(
537 v: Verification,
538 channel: TarrableSearchPath,
539 table: Dict[str, ChannelTableEntry],
540 pin: GitPin) -> None:
541 with tempfile.TemporaryDirectory() as channel_contents, \
542 tempfile.TemporaryDirectory() as git_contents:
543
544 extract_tarball(v, table, channel_contents)
545 check_channel_metadata(v, pin, channel_contents)
546
547 git_checkout(v, channel, pin, git_contents)
548
549 compare_tarball_and_git(v, pin, channel_contents, git_contents)
550
551 v.status('Removing temporary directories')
552 v.ok()
553
554
555 def git_revision_name(
556 v: Verification,
557 channel: TarrableSearchPath,
558 git_revision: str) -> str:
559 v.status('Getting commit date')
560 process = subprocess.run(['git',
561 '-C',
562 git_cache.git_cachedir(channel.git_repo),
563 'log',
564 '-n1',
565 '--format=%ct-%h',
566 '--abbrev=11',
567 '--no-show-signature',
568 git_revision],
569 stdout=subprocess.PIPE)
570 v.result(process.returncode == 0 and process.stdout != b'')
571 return f'{
572 os.path.basename(channel.git_repo)}-{
573 process.stdout.decode().strip()}'
574
575
576 K = TypeVar('K')
577 V = TypeVar('V')
578
579
580 def partition_dict(pred: Callable[[K, V], bool],
581 d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
582 selected: Dict[K, V] = {}
583 remaining: Dict[K, V] = {}
584 for k, v in d.items():
585 if pred(k, v):
586 selected[k] = v
587 else:
588 remaining[k] = v
589 return selected, remaining
590
591
592 def filter_dict(d: Dict[K, V], fields: Set[K]
593 ) -> Tuple[Dict[K, V], Dict[K, V]]:
594 return partition_dict(lambda k, v: k in fields, d)
595
596
597 def read_config_section(
598 conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
599 mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
600 'alias': (AliasSearchPath, AliasPin),
601 'channel': (ChannelSearchPath, ChannelPin),
602 'git': (GitSearchPath, GitPin),
603 'symlink': (SymlinkSearchPath, SymlinkPin),
604 }
605 SP, P = mapping[conf['type']]
606 _, all_fields = filter_dict(dict(conf.items()), set(['type']))
607 pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
608 # Error suppression works around https://github.com/python/mypy/issues/9007
609 pin_present = pin_fields or P._fields == ()
610 pin = P(**pin_fields) if pin_present else None # type: ignore
611 return SP(**remaining_fields), pin
612
613
614 def read_pinned_config_section(
615 section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
616 sp, pin = read_config_section(conf)
617 if pin is None:
618 raise RuntimeError(
619 f'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
620 return sp, pin
621
622
623 def read_config(filename: str) -> configparser.ConfigParser:
624 config = configparser.ConfigParser()
625 with open(filename, encoding='utf-8') as f:
626 config.read_file(f, filename)
627 return config
628
629
630 def read_config_files(
631 filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
632 merged_config: Dict[str, configparser.SectionProxy] = {}
633 for file in filenames:
634 config = read_config(file)
635 for section in config.sections():
636 if section in merged_config:
637 raise RuntimeError('Duplicate channel "{section}"')
638 merged_config[section] = config[section]
639 return merged_config
640
641
642 def pinCommand(args: argparse.Namespace) -> None:
643 v = Verification()
644 config = read_config(args.channels_file)
645 for section in config.sections():
646 if args.channels and section not in args.channels:
647 continue
648
649 sp, old_pin = read_config_section(config[section])
650
651 config[section].update(sp.pin(v, old_pin)._asdict())
652
653 with open(args.channels_file, 'w', encoding='utf-8') as configfile:
654 config.write(configfile)
655
656
657 def updateCommand(args: argparse.Namespace) -> None:
658 v = Verification()
659 exprs: Dict[str, str] = {}
660 profile_manifest = os.path.join(args.profile, "manifest.nix")
661 search_paths: List[str] = [
662 "-I", "pinch_profile=" + args.profile,
663 "-I", "pinch_profile_manifest=" + os.readlink(profile_manifest)
664 ] if os.path.exists(profile_manifest) else []
665 config = {
666 section: read_pinned_config_section(section, conf) for section,
667 conf in read_config_files(
668 args.channels_file).items()}
669 alias, nonalias = partition_dict(
670 lambda k, v: isinstance(v[0], AliasSearchPath), config)
671
672 for section, (sp, pin) in sorted(nonalias.items()):
673 assert not isinstance(sp, AliasSearchPath) # mypy can't see through
674 assert not isinstance(pin, AliasPin) # partition_dict()
675 tarball = sp.fetch(v, pin)
676 search_paths.extend(
677 ["-I", f"pinch_tarball_for_{pin.release_name}={tarball}"])
678 exprs[section] = (
679 f'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
680 f'src = builtins.storePath "{tarball}"; }}')
681
682 for section, (sp, pin) in alias.items():
683 assert isinstance(sp, AliasSearchPath) # For mypy
684 exprs[section] = exprs[sp.alias_of]
685
686 with tempfile.NamedTemporaryFile() as unpack_channel_nix:
687 unpack_channel_nix.write(b'''
688 { name, channelName, src, }:
689 derivation {
690 inherit name channelName src;
691 builder = "builtin:unpack-channel";
692 system = "builtin";
693 preferLocalBuild = true;
694 }
695 ''')
696 unpack_channel_nix.flush()
697
698 command = [
699 'nix-env',
700 '--profile',
701 args.profile,
702 '--show-trace',
703 '--file',
704 unpack_channel_nix.name,
705 '--install',
706 '--remove-all',
707 ] + search_paths + ['--from-expression'] + [
708 exprs[name] % name for name in sorted(exprs.keys())]
709 if args.dry_run:
710 print(' '.join(map(shlex.quote, command)))
711 else:
712 v.status('Installing channels with nix-env')
713 process = subprocess.run(command)
714 v.result(process.returncode == 0)
715
716
717 def main() -> None:
718 parser = argparse.ArgumentParser(prog='pinch')
719 subparsers = parser.add_subparsers(dest='mode', required=True)
720 parser_pin = subparsers.add_parser('pin')
721 parser_pin.add_argument('channels_file', type=str)
722 parser_pin.add_argument('channels', type=str, nargs='*')
723 parser_pin.set_defaults(func=pinCommand)
724 parser_update = subparsers.add_parser('update')
725 parser_update.add_argument('--dry-run', action='store_true')
726 parser_update.add_argument('--profile', default=(
727 f'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
728 parser_update.add_argument('channels_file', type=str, nargs='+')
729 parser_update.set_defaults(func=updateCommand)
730 args = parser.parse_args()
731 args.func(args)
732
733
734 if __name__ == '__main__':
735 main()