1 # pinch: PIN CHannels - a replacement for `nix-channel --update`
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
26 import xml
.dom
.minidom
46 # Use xdg module when it's less painful to have as a dependency
49 class XDG(NamedTuple
):
54 XDG_CACHE_HOME
=os
.getenv(
56 os
.path
.expanduser('~/.cache')))
59 class VerificationError(Exception):
65 def __init__(self
) -> None:
68 def status(self
, s
: str) -> None:
69 print(s
, end
=' ', file=sys
.stderr
, flush
=True)
70 self
.line_length
+= 1 + len(s
) # Unicode??
73 def _color(s
: str, c
: int) -> str:
74 return f
'\033[{c:2d}m{s}\033[00m'
76 def result(self
, r
: bool) -> None:
77 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
79 cols
= shutil
.get_terminal_size().columns
or 80
80 pad
= (cols
- (self
.line_length
+ length
)) % cols
81 print(' ' * pad
+ self
._color
(message
, color
), file=sys
.stderr
)
84 raise VerificationError()
86 def check(self
, s
: str, r
: bool) -> None:
94 Digest16
= NewType('Digest16', str)
95 Digest32
= NewType('Digest32', str)
98 class ChannelTableEntry(types
.SimpleNamespace
):
106 class AliasPin(NamedTuple
):
110 class SymlinkPin(NamedTuple
):
112 def release_name(self
) -> str:
116 class GitPin(NamedTuple
):
121 class ChannelPin(NamedTuple
):
128 Pin
= Union
[AliasPin
, SymlinkPin
, GitPin
, ChannelPin
]
131 def copy_to_nix_store(v
: Verification
, filename
: str) -> str:
132 v
.status('Putting tarball in Nix store')
133 process
= subprocess
.run(
134 ['nix-store', '--add', filename
], stdout
=subprocess
.PIPE
)
135 v
.result(process
.returncode
== 0)
136 return process
.stdout
.decode().strip() # type: ignore # (for old mypy)
139 def symlink_archive(v
: Verification
, path
: str) -> str:
140 with tempfile
.TemporaryDirectory() as td
:
141 archive_filename
= os
.path
.join(td
, 'link.tar.gz')
142 os
.symlink(path
, os
.path
.join(td
, 'link'))
143 with tarfile
.open(archive_filename
, mode
='x:gz') as t
:
144 t
.add(os
.path
.join(td
, 'link'), arcname
='link')
145 return copy_to_nix_store(v
, archive_filename
)
148 class AliasSearchPath(NamedTuple
):
151 def pin(self
, _
: Verification
, __
: Optional
[Pin
]) -> AliasPin
:
155 class SymlinkSearchPath(NamedTuple
):
158 def pin(self
, _
: Verification
, __
: Optional
[Pin
]) -> SymlinkPin
:
161 def fetch(self
, v
: Verification
, _
: Pin
) -> str:
162 return symlink_archive(v
, self
.path
)
165 class GitSearchPath(NamedTuple
):
169 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> GitPin
:
170 _
, new_revision
= git_cache
.fetch(self
.git_repo
, self
.git_ref
)
171 if old_pin
is not None:
172 assert isinstance(old_pin
, GitPin
)
173 verify_git_ancestry(v
, self
, old_pin
.git_revision
, new_revision
)
174 return GitPin(release_name
=git_revision_name(v
, self
, new_revision
),
175 git_revision
=new_revision
)
177 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
178 assert isinstance(pin
, GitPin
)
179 git_cache
.ensure_rev_available(
180 self
.git_repo
, self
.git_ref
, pin
.git_revision
)
181 return git_get_tarball(v
, self
, pin
)
184 class ChannelSearchPath(NamedTuple
):
189 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> ChannelPin
:
190 if old_pin
is not None:
191 assert isinstance(old_pin
, ChannelPin
)
193 channel_html
, forwarded_url
= fetch_channel(v
, self
)
194 table
, new_gitpin
= parse_channel(v
, channel_html
)
195 if old_pin
is not None and old_pin
.git_revision
== new_gitpin
.git_revision
:
197 fetch_resources(v
, new_gitpin
, forwarded_url
, table
)
198 git_cache
.ensure_rev_available(
199 self
.git_repo
, self
.git_ref
, new_gitpin
.git_revision
)
200 if old_pin
is not None:
202 v
, self
, old_pin
.git_revision
, new_gitpin
.git_revision
)
203 check_channel_contents(v
, self
, table
, new_gitpin
)
205 release_name
=new_gitpin
.release_name
,
206 tarball_url
=table
['nixexprs.tar.xz'].absolute_url
,
207 tarball_sha256
=table
['nixexprs.tar.xz'].digest
,
208 git_revision
=new_gitpin
.git_revision
)
210 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
211 assert isinstance(pin
, ChannelPin
)
213 return fetch_with_nix_prefetch_url(
214 v
, pin
.tarball_url
, Digest16(pin
.tarball_sha256
))
217 SearchPath
= Union
[AliasSearchPath
,
221 TarrableSearchPath
= Union
[GitSearchPath
, ChannelSearchPath
]
224 def compare(a
: str, b
: str) -> Tuple
[List
[str], List
[str], List
[str]]:
226 def throw(error
: OSError) -> None:
229 def join(x
: str, y
: str) -> str:
230 return y
if x
== '.' else os
.path
.join(x
, y
)
232 def recursive_files(d
: str) -> Iterable
[str]:
233 all_files
: List
[str] = []
234 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
235 rel
= os
.path
.relpath(path
, start
=d
)
236 all_files
.extend(join(rel
, f
) for f
in files
)
237 for dir_or_link
in dirs
:
238 if os
.path
.islink(join(path
, dir_or_link
)):
239 all_files
.append(join(rel
, dir_or_link
))
242 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
243 return (f
for f
in files
if not f
.startswith('.git/'))
245 files
= functools
.reduce(
248 recursive_files(x
))) for x
in [a
, b
]))
249 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
253 v
: Verification
, channel
: ChannelSearchPath
) -> Tuple
[str, str]:
254 v
.status(f
'Fetching channel from {channel.channel_url}')
255 with urllib
.request
.urlopen(channel
.channel_url
, timeout
=10) as request
:
256 channel_html
= request
.read().decode()
257 forwarded_url
= request
.geturl()
258 v
.result(request
.status
== 200)
259 v
.check('Got forwarded', channel
.channel_url
!= forwarded_url
)
260 return channel_html
, forwarded_url
263 def parse_channel(v
: Verification
, channel_html
: str) \
264 -> Tuple
[Dict
[str, ChannelTableEntry
], GitPin
]:
265 v
.status('Parsing channel description as XML')
266 d
= xml
.dom
.minidom
.parseString(channel_html
)
269 v
.status('Finding release name (1)')
270 title
= d
.getElementsByTagName('title')[0].firstChild
271 v
.result(isinstance(title
, xml
.dom
.minidom
.CharacterData
))
272 assert isinstance(title
, xml
.dom
.minidom
.CharacterData
)
273 release_name
= title
.nodeValue
.split()[2]
274 v
.status('Finding release name (2)')
275 h1
= d
.getElementsByTagName('h1')[0].firstChild
276 v
.result(isinstance(h1
, xml
.dom
.minidom
.CharacterData
))
277 assert isinstance(h1
, xml
.dom
.minidom
.CharacterData
)
278 v
.status('Verifying release name:')
279 v
.status(release_name
)
280 v
.result(release_name
== h1
.nodeValue
.split()[2])
282 v
.status('Finding git commit')
283 git_commit_node
= d
.getElementsByTagName('tt')[0]
286 git_commit_node
.firstChild
,
287 xml
.dom
.minidom
.CharacterData
))
289 git_commit_node
.firstChild
,
290 xml
.dom
.minidom
.CharacterData
)
291 v
.status('Extracting git commit:')
292 git_revision
= git_commit_node
.firstChild
.nodeValue
293 v
.status(git_revision
)
295 v
.status('Verifying git commit label')
296 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
298 v
.status('Parsing table')
299 table
: Dict
[str, ChannelTableEntry
] = {}
300 for row
in d
.getElementsByTagName('tr')[1:]:
301 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
302 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
303 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
304 digest
= Digest16(row
.childNodes
[2].firstChild
.firstChild
.nodeValue
)
305 table
[name
] = ChannelTableEntry(url
=url
, digest
=digest
, size
=size
)
307 return table
, GitPin(release_name
=release_name
, git_revision
=git_revision
)
310 def digest_string(s
: bytes) -> Digest16
:
311 return Digest16(hashlib
.sha256(s
).hexdigest())
314 def digest_file(filename
: str) -> Digest16
:
315 hasher
= hashlib
.sha256()
316 with open(filename
, 'rb') as f
:
317 # pylint: disable=cell-var-from-loop
318 for block
in iter(lambda: f
.read(4096), b
''):
320 return Digest16(hasher
.hexdigest())
324 def _experimental_flag_needed(v
: Verification
) -> bool:
325 v
.status('Checking Nix version')
326 process
= subprocess
.run(['nix', '--help'], stdout
=subprocess
.PIPE
)
327 v
.result(process
.returncode
== 0)
328 return b
'--experimental-features' in process
.stdout
331 def _nix_command(v
: Verification
) -> List
[str]:
332 return ['nix', '--experimental-features',
333 'nix-command'] if _experimental_flag_needed(v
) else ['nix']
336 def to_Digest16(v
: Verification
, digest32
: Digest32
) -> Digest16
:
337 v
.status('Converting digest to base16')
338 process
= subprocess
.run(_nix_command(v
) + [
343 stdout
=subprocess
.PIPE
)
344 v
.result(process
.returncode
== 0)
345 return Digest16(process
.stdout
.decode().strip())
348 def to_Digest32(v
: Verification
, digest16
: Digest16
) -> Digest32
:
349 v
.status('Converting digest to base32')
350 process
= subprocess
.run(_nix_command(v
) + [
355 stdout
=subprocess
.PIPE
)
356 v
.result(process
.returncode
== 0)
357 return Digest32(process
.stdout
.decode().strip())
360 def fetch_with_nix_prefetch_url(
363 digest
: Digest16
) -> str:
364 v
.status(f
'Fetching {url}')
365 process
= subprocess
.run(
366 ['nix-prefetch-url', '--print-path', url
, digest
], stdout
=subprocess
.PIPE
)
367 v
.result(process
.returncode
== 0)
368 prefetch_digest
, path
, empty
= process
.stdout
.decode().split('\n')
370 v
.check("Verifying nix-prefetch-url's digest",
371 to_Digest16(v
, Digest32(prefetch_digest
)) == digest
)
372 v
.status(f
"Verifying digest of {path}")
373 file_digest
= digest_file(path
)
374 v
.result(file_digest
== digest
)
375 return path
# type: ignore # (for old mypy)
382 table
: Dict
[str, ChannelTableEntry
]) -> None:
383 for resource
in ['git-revision', 'nixexprs.tar.xz']:
384 fields
= table
[resource
]
385 fields
.absolute_url
= urllib
.parse
.urljoin(forwarded_url
, fields
.url
)
386 fields
.file = fetch_with_nix_prefetch_url(
387 v
, fields
.absolute_url
, fields
.digest
)
388 v
.status('Verifying git commit on main page matches git commit in table')
389 with open(table
['git-revision'].file, encoding
='utf-8') as rev_file
:
390 v
.result(rev_file
.read(999) == pin
.git_revision
)
393 def tarball_cache_file(channel
: TarrableSearchPath
, pin
: GitPin
) -> str:
397 f
'{digest_string(channel.git_repo.encode())}-{pin.git_revision}-{pin.release_name}')
400 def verify_git_ancestry(
402 channel
: TarrableSearchPath
,
404 new_revision
: str) -> None:
405 cachedir
= git_cache
.git_cachedir(channel
.git_repo
)
406 v
.status(f
'Verifying rev is an ancestor of previous rev {old_revision}')
407 process
= subprocess
.run(['git',
414 v
.result(process
.returncode
== 0)
417 def compare_tarball_and_git(
420 channel_contents
: str,
421 git_contents
: str) -> None:
422 v
.status('Comparing channel tarball with git checkout')
423 match
, mismatch
, errors
= compare(os
.path
.join(
424 channel_contents
, pin
.release_name
), git_contents
)
426 v
.check(f
'{len(match)} files match', len(match
) > 0)
427 v
.check(f
'{len(mismatch)} files differ', len(mismatch
) == 0)
435 'pkgs/test/nixpkgs-check-by-name/tests/multiple-failures/pkgs/by-name/A/fo@/foo',
436 'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo',
437 'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo.nix',
439 benign_expected_errors
= []
440 benign_permitted_errors
= []
441 for ee
in expected_errors
:
444 benign_expected_errors
.append(ee
)
445 for pe
in permitted_errors
:
448 benign_permitted_errors
.append(ee
)
450 f
'{len(errors)} unexpected incomparable files: {errors}',
453 f
'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
454 len(benign_expected_errors
) == len(expected_errors
))
456 f
'({len(benign_permitted_errors)} of {len(permitted_errors)} permitted incomparable files)',
457 len(benign_permitted_errors
) <= len(permitted_errors
))
462 table
: Dict
[str, ChannelTableEntry
],
464 v
.status(f
"Extracting tarball {table['nixexprs.tar.xz'].file}")
465 shutil
.unpack_archive(table
['nixexprs.tar.xz'].file, dest
)
471 channel
: TarrableSearchPath
,
474 v
.status('Checking out corresponding git revision')
475 with subprocess
.Popen(
476 ['git', '-C', git_cache
.git_cachedir(channel
.git_repo
), 'archive', pin
.git_revision
],
477 stdout
=subprocess
.PIPE
) as git
:
478 with subprocess
.Popen(['tar', 'x', '-C', dest
, '-f', '-'], stdin
=git
.stdout
) as tar
:
483 v
.result(git
.returncode
== 0 and tar
.returncode
== 0)
488 channel
: TarrableSearchPath
,
490 cache_file
= tarball_cache_file(channel
, pin
)
491 if os
.path
.exists(cache_file
):
492 with open(cache_file
, encoding
='utf-8') as f
:
493 cached_tarball
= f
.read(9999)
494 if os
.path
.exists(cached_tarball
):
495 return cached_tarball
497 with tempfile
.TemporaryDirectory() as output_dir
:
498 output_filename
= os
.path
.join(
499 output_dir
, pin
.release_name
+ '.tar.xz')
500 with open(output_filename
, 'w', encoding
='utf-8') as output_file
:
501 v
.status(f
'Generating tarball for git revision {pin.git_revision}')
502 with subprocess
.Popen(
503 ['git', '-C', git_cache
.git_cachedir(channel
.git_repo
),
504 'archive', f
'--prefix={pin.release_name}/', pin
.git_revision
],
505 stdout
=subprocess
.PIPE
) as git
:
506 with subprocess
.Popen(['xz'], stdin
=git
.stdout
, stdout
=output_file
) as xz
:
509 v
.result(git
.returncode
== 0 and xz
.returncode
== 0)
511 store_tarball
= copy_to_nix_store(v
, output_filename
)
513 os
.makedirs(os
.path
.dirname(cache_file
), exist_ok
=True)
514 with open(cache_file
, 'w', encoding
='utf-8') as f
:
515 f
.write(store_tarball
)
516 return store_tarball
# type: ignore # (for old mypy)
519 def check_channel_metadata(
522 channel_contents
: str) -> None:
523 v
.status('Verifying git commit in channel tarball')
524 with open(os
.path
.join(channel_contents
, pin
.release_name
, '.git-revision'),
525 encoding
='utf-8') as f
:
526 v
.result(f
.read(999) == pin
.git_revision
)
529 f
'Verifying version-suffix is a suffix of release name {pin.release_name}:')
530 with open(os
.path
.join(channel_contents
, pin
.release_name
, '.version-suffix'),
531 encoding
='utf-8') as f
:
532 version_suffix
= f
.read(999)
533 v
.status(version_suffix
)
534 v
.result(pin
.release_name
.endswith(version_suffix
))
537 def check_channel_contents(
539 channel
: TarrableSearchPath
,
540 table
: Dict
[str, ChannelTableEntry
],
541 pin
: GitPin
) -> None:
542 with tempfile
.TemporaryDirectory() as channel_contents
, \
543 tempfile
.TemporaryDirectory() as git_contents
:
545 extract_tarball(v
, table
, channel_contents
)
546 check_channel_metadata(v
, pin
, channel_contents
)
548 git_checkout(v
, channel
, pin
, git_contents
)
550 compare_tarball_and_git(v
, pin
, channel_contents
, git_contents
)
552 v
.status('Removing temporary directories')
556 def git_revision_name(
558 channel
: TarrableSearchPath
,
559 git_revision
: str) -> str:
560 v
.status('Getting commit date')
561 process
= subprocess
.run(['git',
563 git_cache
.git_cachedir(channel
.git_repo
),
568 '--no-show-signature',
570 stdout
=subprocess
.PIPE
)
571 v
.result(process
.returncode
== 0 and process
.stdout
!= b
'')
572 return f
'{os.path.basename(channel.git_repo)}-{process.stdout.decode().strip()}'
579 def partition_dict(pred
: Callable
[[K
, V
], bool],
580 d
: Dict
[K
, V
]) -> Tuple
[Dict
[K
, V
], Dict
[K
, V
]]:
581 selected
: Dict
[K
, V
] = {}
582 remaining
: Dict
[K
, V
] = {}
583 for k
, v
in d
.items():
588 return selected
, remaining
591 def filter_dict(d
: Dict
[K
, V
], fields
: Set
[K
]
592 ) -> Tuple
[Dict
[K
, V
], Dict
[K
, V
]]:
593 return partition_dict(lambda k
, v
: k
in fields
, d
)
596 def read_config_section(
597 conf
: configparser
.SectionProxy
) -> Tuple
[SearchPath
, Optional
[Pin
]]:
598 mapping
: Mapping
[str, Tuple
[Type
[SearchPath
], Type
[Pin
]]] = {
599 'alias': (AliasSearchPath
, AliasPin
),
600 'channel': (ChannelSearchPath
, ChannelPin
),
601 'git': (GitSearchPath
, GitPin
),
602 'symlink': (SymlinkSearchPath
, SymlinkPin
),
604 SP
, P
= mapping
[conf
['type']]
605 _
, all_fields
= filter_dict(dict(conf
.items()), set(['type']))
606 pin_fields
, remaining_fields
= filter_dict(all_fields
, set(P
._fields
))
607 # Error suppression works around https://github.com/python/mypy/issues/9007
608 pin_present
= pin_fields
or P
._fields
== ()
609 pin
= P(**pin_fields
) if pin_present
else None # type: ignore
610 return SP(**remaining_fields
), pin
613 def read_pinned_config_section(
614 section
: str, conf
: configparser
.SectionProxy
) -> Tuple
[SearchPath
, Pin
]:
615 sp
, pin
= read_config_section(conf
)
618 f
'Cannot update unpinned channel "{section}" (Run "pin" before "update")')
622 def read_config(filename
: str) -> configparser
.ConfigParser
:
623 config
= configparser
.ConfigParser()
624 with open(filename
, encoding
='utf-8') as f
:
625 config
.read_file(f
, filename
)
629 def read_config_files(
630 filenames
: Iterable
[str]) -> Dict
[str, configparser
.SectionProxy
]:
631 merged_config
: Dict
[str, configparser
.SectionProxy
] = {}
632 for file in filenames
:
633 config
= read_config(file)
634 for section
in config
.sections():
635 if section
in merged_config
:
636 raise RuntimeError('Duplicate channel "{section}"')
637 merged_config
[section
] = config
[section
]
641 def pinCommand(args
: argparse
.Namespace
) -> None:
643 config
= read_config(args
.channels_file
)
644 for section
in config
.sections():
645 if args
.channels
and section
not in args
.channels
:
648 sp
, old_pin
= read_config_section(config
[section
])
650 config
[section
].update(sp
.pin(v
, old_pin
)._asdict
())
652 with open(args
.channels_file
, 'w', encoding
='utf-8') as configfile
:
653 config
.write(configfile
)
656 def updateCommand(args
: argparse
.Namespace
) -> None:
658 exprs
: Dict
[str, str] = {}
659 profile_manifest
= os
.path
.join(args
.profile
, "manifest.nix")
660 search_paths
: List
[str] = [
661 "-I", "pinch_profile=" + args
.profile
,
662 "-I", "pinch_profile_manifest=" + os
.readlink(profile_manifest
)
663 ] if os
.path
.exists(profile_manifest
) else []
665 section
: read_pinned_config_section(section
, conf
) for section
,
666 conf
in read_config_files(
667 args
.channels_file
).items()}
668 alias
, nonalias
= partition_dict(
669 lambda k
, v
: isinstance(v
[0], AliasSearchPath
), config
)
671 for section
, (sp
, pin
) in sorted(nonalias
.items()):
672 assert not isinstance(sp
, AliasSearchPath
) # mypy can't see through
673 assert not isinstance(pin
, AliasPin
) # partition_dict()
674 tarball
= sp
.fetch(v
, pin
)
676 ["-I", f
"pinch_tarball_for_{pin.release_name}={tarball}"])
678 f
'f: f {{ name = "{pin.release_name}"; channelName = "%s"; '
679 f
'src = builtins.storePath "{tarball}"; }}')
681 for section
, (sp
, pin
) in alias
.items():
682 assert isinstance(sp
, AliasSearchPath
) # For mypy
683 exprs
[section
] = exprs
[sp
.alias_of
]
691 '<nix/unpack-channel.nix>',
694 ] + search_paths
+ ['--from-expression'] + [
695 exprs
[name
] % name
for name
in sorted(exprs
.keys())]
697 print(' '.join(map(shlex
.quote
, command
)))
699 v
.status('Installing channels with nix-env')
700 process
= subprocess
.run(command
)
701 v
.result(process
.returncode
== 0)
705 parser
= argparse
.ArgumentParser(prog
='pinch')
706 subparsers
= parser
.add_subparsers(dest
='mode', required
=True)
707 parser_pin
= subparsers
.add_parser('pin')
708 parser_pin
.add_argument('channels_file', type=str)
709 parser_pin
.add_argument('channels', type=str, nargs
='*')
710 parser_pin
.set_defaults(func
=pinCommand
)
711 parser_update
= subparsers
.add_parser('update')
712 parser_update
.add_argument('--dry-run', action
='store_true')
713 parser_update
.add_argument('--profile', default
=(
714 f
'/nix/var/nix/profiles/per-user/{getpass.getuser()}/channels'))
715 parser_update
.add_argument('channels_file', type=str, nargs
='+')
716 parser_update
.set_defaults(func
=updateCommand
)
717 args
= parser
.parse_args()
721 if __name__
== '__main__':