19 import xml
.dom
.minidom
37 # Use xdg module when it's less painful to have as a dependency
40 class XDG(NamedTuple
):
45 XDG_CACHE_HOME
=os
.getenv(
47 os
.path
.expanduser('~/.cache')))
50 class VerificationError(Exception):
56 def __init__(self
) -> None:
59 def status(self
, s
: str) -> None:
60 print(s
, end
=' ', file=sys
.stderr
, flush
=True)
61 self
.line_length
+= 1 + len(s
) # Unicode??
64 def _color(s
: str, c
: int) -> str:
65 return '\033[%2dm%s\033[00m' % (c
, s
)
67 def result(self
, r
: bool) -> None:
68 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
70 cols
= shutil
.get_terminal_size().columns
or 80
71 pad
= (cols
- (self
.line_length
+ length
)) % cols
72 print(' ' * pad
+ self
._color
(message
, color
), file=sys
.stderr
)
75 raise VerificationError()
77 def check(self
, s
: str, r
: bool) -> None:
85 Digest16
= NewType('Digest16', str)
86 Digest32
= NewType('Digest32', str)
89 class ChannelTableEntry(types
.SimpleNamespace
):
97 class AliasPin(NamedTuple
):
101 class SymlinkPin(NamedTuple
):
103 def release_name(self
) -> str:
107 class GitPin(NamedTuple
):
112 class ChannelPin(NamedTuple
):
119 Pin
= Union
[AliasPin
, SymlinkPin
, GitPin
, ChannelPin
]
122 def copy_to_nix_store(v
: Verification
, filename
: str) -> str:
123 v
.status('Putting tarball in Nix store')
124 process
= subprocess
.run(
125 ['nix-store', '--add', filename
], stdout
=subprocess
.PIPE
)
126 v
.result(process
.returncode
== 0)
127 return process
.stdout
.decode().strip() # type: ignore # (for old mypy)
130 def symlink_archive(v
: Verification
, path
: str) -> str:
131 with tempfile
.TemporaryDirectory() as td
:
132 archive_filename
= os
.path
.join(td
, 'link.tar.gz')
133 os
.symlink(path
, os
.path
.join(td
, 'link'))
134 with tarfile
.open(archive_filename
, mode
='x:gz') as t
:
135 t
.add(os
.path
.join(td
, 'link'), arcname
='link')
136 return copy_to_nix_store(v
, archive_filename
)
139 class AliasSearchPath(NamedTuple
):
142 # pylint: disable=no-self-use
143 def pin(self
, _
: Verification
, __
: Optional
[Pin
]) -> AliasPin
:
147 class SymlinkSearchPath(NamedTuple
):
150 # pylint: disable=no-self-use
151 def pin(self
, _
: Verification
, __
: Optional
[Pin
]) -> SymlinkPin
:
154 def fetch(self
, v
: Verification
, _
: Pin
) -> str:
155 return symlink_archive(v
, self
.path
)
158 class GitSearchPath(NamedTuple
):
162 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> GitPin
:
163 if old_pin
is not None:
164 assert isinstance(old_pin
, GitPin
)
165 old_revision
= old_pin
.git_revision
if old_pin
is not None else None
167 new_revision
= git_fetch(v
, self
, None, old_revision
)
168 return GitPin(release_name
=git_revision_name(v
, self
, new_revision
),
169 git_revision
=new_revision
)
171 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
172 assert isinstance(pin
, GitPin
)
173 ensure_git_rev_available(v
, self
, pin
, None)
174 return git_get_tarball(v
, self
, pin
)
177 class ChannelSearchPath(NamedTuple
):
182 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> ChannelPin
:
183 if old_pin
is not None:
184 assert isinstance(old_pin
, ChannelPin
)
185 old_revision
= old_pin
.git_revision
if old_pin
is not None else None
187 channel_html
, forwarded_url
= fetch_channel(v
, self
)
188 table
, new_gitpin
= parse_channel(v
, channel_html
)
189 if old_pin
is not None and old_pin
.git_revision
== new_gitpin
.git_revision
:
191 fetch_resources(v
, new_gitpin
, forwarded_url
, table
)
192 ensure_git_rev_available(v
, self
, new_gitpin
, old_revision
)
193 check_channel_contents(v
, self
, table
, new_gitpin
)
195 release_name
=new_gitpin
.release_name
,
196 tarball_url
=table
['nixexprs.tar.xz'].absolute_url
,
197 tarball_sha256
=table
['nixexprs.tar.xz'].digest
,
198 git_revision
=new_gitpin
.git_revision
)
200 # pylint: disable=no-self-use
201 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
202 assert isinstance(pin
, ChannelPin
)
204 return fetch_with_nix_prefetch_url(
205 v
, pin
.tarball_url
, Digest16(pin
.tarball_sha256
))
208 SearchPath
= Union
[AliasSearchPath
,
212 TarrableSearchPath
= Union
[GitSearchPath
, ChannelSearchPath
]
215 def compare(a
: str, b
: str) -> Tuple
[List
[str], List
[str], List
[str]]:
217 def throw(error
: OSError) -> None:
220 def join(x
: str, y
: str) -> str:
221 return y
if x
== '.' else os
.path
.join(x
, y
)
223 def recursive_files(d
: str) -> Iterable
[str]:
224 all_files
: List
[str] = []
225 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
226 rel
= os
.path
.relpath(path
, start
=d
)
227 all_files
.extend(join(rel
, f
) for f
in files
)
228 for dir_or_link
in dirs
:
229 if os
.path
.islink(join(path
, dir_or_link
)):
230 all_files
.append(join(rel
, dir_or_link
))
233 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
234 return (f
for f
in files
if not f
.startswith('.git/'))
236 files
= functools
.reduce(
239 recursive_files(x
))) for x
in [a
, b
]))
240 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
244 v
: Verification
, channel
: ChannelSearchPath
) -> Tuple
[str, str]:
245 v
.status('Fetching channel')
246 request
= urllib
.request
.urlopen(channel
.channel_url
, timeout
=10)
247 channel_html
= request
.read().decode()
248 forwarded_url
= request
.geturl()
249 v
.result(request
.status
== 200) # type: ignore # (for old mypy)
250 v
.check('Got forwarded', channel
.channel_url
!= forwarded_url
)
251 return channel_html
, forwarded_url
254 def parse_channel(v
: Verification
, channel_html
: str) \
255 -> Tuple
[Dict
[str, ChannelTableEntry
], GitPin
]:
256 v
.status('Parsing channel description as XML')
257 d
= xml
.dom
.minidom
.parseString(channel_html
)
260 v
.status('Extracting release name:')
261 title_name
= d
.getElementsByTagName(
262 'title')[0].firstChild
.nodeValue
.split()[2]
263 h1_name
= d
.getElementsByTagName('h1')[0].firstChild
.nodeValue
.split()[2]
265 v
.result(title_name
== h1_name
)
267 v
.status('Extracting git commit:')
268 git_commit_node
= d
.getElementsByTagName('tt')[0]
269 git_revision
= git_commit_node
.firstChild
.nodeValue
270 v
.status(git_revision
)
272 v
.status('Verifying git commit label')
273 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
275 v
.status('Parsing table')
276 table
: Dict
[str, ChannelTableEntry
] = {}
277 for row
in d
.getElementsByTagName('tr')[1:]:
278 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
279 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
280 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
281 digest
= Digest16(row
.childNodes
[2].firstChild
.firstChild
.nodeValue
)
282 table
[name
] = ChannelTableEntry(url
=url
, digest
=digest
, size
=size
)
284 return table
, GitPin(release_name
=title_name
, git_revision
=git_revision
)
287 def digest_string(s
: bytes) -> Digest16
:
288 return Digest16(hashlib
.sha256(s
).hexdigest())
291 def digest_file(filename
: str) -> Digest16
:
292 hasher
= hashlib
.sha256()
293 with open(filename
, 'rb') as f
:
294 # pylint: disable=cell-var-from-loop
295 for block
in iter(lambda: f
.read(4096), b
''):
297 return Digest16(hasher
.hexdigest())
300 def to_Digest16(v
: Verification
, digest32
: Digest32
) -> Digest16
:
301 v
.status('Converting digest to base16')
302 process
= subprocess
.run(
303 ['nix', 'to-base16', '--type', 'sha256', digest32
], stdout
=subprocess
.PIPE
)
304 v
.result(process
.returncode
== 0)
305 return Digest16(process
.stdout
.decode().strip())
308 def to_Digest32(v
: Verification
, digest16
: Digest16
) -> Digest32
:
309 v
.status('Converting digest to base32')
310 process
= subprocess
.run(
311 ['nix', 'to-base32', '--type', 'sha256', digest16
], stdout
=subprocess
.PIPE
)
312 v
.result(process
.returncode
== 0)
313 return Digest32(process
.stdout
.decode().strip())
316 def fetch_with_nix_prefetch_url(
319 digest
: Digest16
) -> str:
320 v
.status('Fetching %s' % url
)
321 process
= subprocess
.run(
322 ['nix-prefetch-url', '--print-path', url
, digest
], stdout
=subprocess
.PIPE
)
323 v
.result(process
.returncode
== 0)
324 prefetch_digest
, path
, empty
= process
.stdout
.decode().split('\n')
326 v
.check("Verifying nix-prefetch-url's digest",
327 to_Digest16(v
, Digest32(prefetch_digest
)) == digest
)
328 v
.status("Verifying file digest")
329 file_digest
= digest_file(path
)
330 v
.result(file_digest
== digest
)
331 return path
# type: ignore # (for old mypy)
338 table
: Dict
[str, ChannelTableEntry
]) -> None:
339 for resource
in ['git-revision', 'nixexprs.tar.xz']:
340 fields
= table
[resource
]
341 fields
.absolute_url
= urllib
.parse
.urljoin(forwarded_url
, fields
.url
)
342 fields
.file = fetch_with_nix_prefetch_url(
343 v
, fields
.absolute_url
, fields
.digest
)
344 v
.status('Verifying git commit on main page matches git commit in table')
345 v
.result(open(table
['git-revision'].file).read(999) == pin
.git_revision
)
348 def git_cachedir(git_repo
: str) -> str:
352 digest_string(git_repo
.encode()))
355 def tarball_cache_file(channel
: TarrableSearchPath
, pin
: GitPin
) -> str:
360 (digest_string(channel
.git_repo
.encode()),
365 def verify_git_ancestry(
367 channel
: TarrableSearchPath
,
369 old_revision
: Optional
[str]) -> None:
370 cachedir
= git_cachedir(channel
.git_repo
)
371 v
.status('Verifying rev is an ancestor of ref')
372 process
= subprocess
.run(['git',
379 v
.result(process
.returncode
== 0)
381 if old_revision
is not None:
383 'Verifying rev is an ancestor of previous rev %s' %
385 process
= subprocess
.run(['git',
392 v
.result(process
.returncode
== 0)
397 channel
: TarrableSearchPath
,
398 desired_revision
: Optional
[str],
399 old_revision
: Optional
[str]) -> str:
400 # It would be nice if we could share the nix git cache, but as of the time
401 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
402 # yet), and trying to straddle them both is too far into nix implementation
403 # details for my comfort. So we re-implement here half of nix.fetchGit.
406 cachedir
= git_cachedir(channel
.git_repo
)
407 if not os
.path
.exists(cachedir
):
408 v
.status("Initializing git repo")
409 process
= subprocess
.run(
410 ['git', 'init', '--bare', cachedir
])
411 v
.result(process
.returncode
== 0)
413 v
.status('Fetching ref "%s" from %s' % (channel
.git_ref
, channel
.git_repo
))
414 # We don't use --force here because we want to abort and freak out if forced
415 # updates are happening.
416 process
= subprocess
.run(['git',
421 '%s:%s' % (channel
.git_ref
,
423 v
.result(process
.returncode
== 0)
425 if desired_revision
is not None:
426 v
.status('Verifying that fetch retrieved this rev')
427 process
= subprocess
.run(
428 ['git', '-C', cachedir
, 'cat-file', '-e', desired_revision
])
429 v
.result(process
.returncode
== 0)
436 channel
.git_ref
)).read(999).strip()
438 verify_git_ancestry(v
, channel
, new_revision
, old_revision
)
443 def ensure_git_rev_available(
445 channel
: TarrableSearchPath
,
447 old_revision
: Optional
[str]) -> None:
448 cachedir
= git_cachedir(channel
.git_repo
)
449 if os
.path
.exists(cachedir
):
450 v
.status('Checking if we already have this rev:')
451 process
= subprocess
.run(
452 ['git', '-C', cachedir
, 'cat-file', '-e', pin
.git_revision
])
453 if process
.returncode
== 0:
455 if process
.returncode
== 1:
457 v
.result(process
.returncode
== 0 or process
.returncode
== 1)
458 if process
.returncode
== 0:
459 verify_git_ancestry(v
, channel
, pin
.git_revision
, old_revision
)
461 git_fetch(v
, channel
, pin
.git_revision
, old_revision
)
464 def compare_tarball_and_git(
467 channel_contents
: str,
468 git_contents
: str) -> None:
469 v
.status('Comparing channel tarball with git checkout')
470 match
, mismatch
, errors
= compare(os
.path
.join(
471 channel_contents
, pin
.release_name
), git_contents
)
473 v
.check('%d files match' % len(match
), len(match
) > 0)
474 v
.check('%d files differ' % len(mismatch
), len(mismatch
) == 0)
482 for ee
in expected_errors
:
485 benign_errors
.append(ee
)
487 '%d unexpected incomparable files' %
491 '(%d of %d expected incomparable files)' %
493 len(expected_errors
)),
494 len(benign_errors
) == len(expected_errors
))
499 table
: Dict
[str, ChannelTableEntry
],
501 v
.status('Extracting tarball %s' % table
['nixexprs.tar.xz'].file)
502 shutil
.unpack_archive(table
['nixexprs.tar.xz'].file, dest
)
508 channel
: TarrableSearchPath
,
511 v
.status('Checking out corresponding git revision')
512 git
= subprocess
.Popen(['git',
514 git_cachedir(channel
.git_repo
),
517 stdout
=subprocess
.PIPE
)
518 tar
= subprocess
.Popen(
519 ['tar', 'x', '-C', dest
, '-f', '-'], stdin
=git
.stdout
)
524 v
.result(git
.returncode
== 0 and tar
.returncode
== 0)
529 channel
: TarrableSearchPath
,
531 cache_file
= tarball_cache_file(channel
, pin
)
532 if os
.path
.exists(cache_file
):
533 cached_tarball
= open(cache_file
).read(9999)
534 if os
.path
.exists(cached_tarball
):
535 return cached_tarball
537 with tempfile
.TemporaryDirectory() as output_dir
:
538 output_filename
= os
.path
.join(
539 output_dir
, pin
.release_name
+ '.tar.xz')
540 with open(output_filename
, 'w') as output_file
:
542 'Generating tarball for git revision %s' %
544 git
= subprocess
.Popen(['git',
546 git_cachedir(channel
.git_repo
),
548 '--prefix=%s/' % pin
.release_name
,
550 stdout
=subprocess
.PIPE
)
551 xz
= subprocess
.Popen(['xz'], stdin
=git
.stdout
, stdout
=output_file
)
554 v
.result(git
.returncode
== 0 and xz
.returncode
== 0)
556 store_tarball
= copy_to_nix_store(v
, output_filename
)
558 os
.makedirs(os
.path
.dirname(cache_file
), exist_ok
=True)
559 open(cache_file
, 'w').write(store_tarball
)
560 return store_tarball
# type: ignore # (for old mypy)
563 def check_channel_metadata(
566 channel_contents
: str) -> None:
567 v
.status('Verifying git commit in channel tarball')
573 '.git-revision')).read(999) == pin
.git_revision
)
576 'Verifying version-suffix is a suffix of release name %s:' %
578 version_suffix
= open(
582 '.version-suffix')).read(999)
583 v
.status(version_suffix
)
584 v
.result(pin
.release_name
.endswith(version_suffix
))
587 def check_channel_contents(
589 channel
: TarrableSearchPath
,
590 table
: Dict
[str, ChannelTableEntry
],
591 pin
: GitPin
) -> None:
592 with tempfile
.TemporaryDirectory() as channel_contents
, \
593 tempfile
.TemporaryDirectory() as git_contents
:
595 extract_tarball(v
, table
, channel_contents
)
596 check_channel_metadata(v
, pin
, channel_contents
)
598 git_checkout(v
, channel
, pin
, git_contents
)
600 compare_tarball_and_git(v
, pin
, channel_contents
, git_contents
)
602 v
.status('Removing temporary directories')
606 def git_revision_name(
608 channel
: TarrableSearchPath
,
609 git_revision
: str) -> str:
610 v
.status('Getting commit date')
611 process
= subprocess
.run(['git',
613 git_cachedir(channel
.git_repo
),
618 '--no-show-signature',
620 stdout
=subprocess
.PIPE
)
621 v
.result(process
.returncode
== 0 and process
.stdout
!= b
'')
622 return '%s-%s' % (os
.path
.basename(channel
.git_repo
),
623 process
.stdout
.decode().strip())
630 def partition_dict(pred
: Callable
[[K
, V
], bool],
631 d
: Dict
[K
, V
]) -> Tuple
[Dict
[K
, V
], Dict
[K
, V
]]:
632 selected
: Dict
[K
, V
] = {}
633 remaining
: Dict
[K
, V
] = {}
634 for k
, v
in d
.items():
639 return selected
, remaining
642 def filter_dict(d
: Dict
[K
, V
], fields
: Set
[K
]
643 ) -> Tuple
[Dict
[K
, V
], Dict
[K
, V
]]:
644 return partition_dict(lambda k
, v
: k
in fields
, d
)
647 def read_config_section(
648 conf
: configparser
.SectionProxy
) -> Tuple
[SearchPath
, Optional
[Pin
]]:
649 mapping
: Mapping
[str, Tuple
[Type
[SearchPath
], Type
[Pin
]]] = {
650 'alias': (AliasSearchPath
, AliasPin
),
651 'channel': (ChannelSearchPath
, ChannelPin
),
652 'git': (GitSearchPath
, GitPin
),
653 'symlink': (SymlinkSearchPath
, SymlinkPin
),
655 SP
, P
= mapping
[conf
['type']]
656 _
, all_fields
= filter_dict(dict(conf
.items()), set(['type']))
657 pin_fields
, remaining_fields
= filter_dict(all_fields
, set(P
._fields
))
658 # Error suppression works around https://github.com/python/mypy/issues/9007
659 pin_present
= pin_fields
!= {} or P
._fields
== ()
660 pin
= P(**pin_fields
) if pin_present
else None # type: ignore
661 return SP(**remaining_fields
), pin
664 def read_pinned_config_section(
665 section
: str, conf
: configparser
.SectionProxy
) -> Tuple
[SearchPath
, Pin
]:
666 sp
, pin
= read_config_section(conf
)
669 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
674 def read_config(filename
: str) -> configparser
.ConfigParser
:
675 config
= configparser
.ConfigParser()
676 config
.read_file(open(filename
), filename
)
680 def read_config_files(
681 filenames
: Iterable
[str]) -> Dict
[str, configparser
.SectionProxy
]:
682 merged_config
: Dict
[str, configparser
.SectionProxy
] = {}
683 for file in filenames
:
684 config
= read_config(file)
685 for section
in config
.sections():
686 if section
in merged_config
:
687 raise Exception('Duplicate channel "%s"' % section
)
688 merged_config
[section
] = config
[section
]
692 def pinCommand(args
: argparse
.Namespace
) -> None:
694 config
= read_config(args
.channels_file
)
695 for section
in config
.sections():
696 if args
.channels
and section
not in args
.channels
:
699 sp
, old_pin
= read_config_section(config
[section
])
701 config
[section
].update(sp
.pin(v
, old_pin
)._asdict
())
703 with open(args
.channels_file
, 'w') as configfile
:
704 config
.write(configfile
)
707 def updateCommand(args
: argparse
.Namespace
) -> None:
709 exprs
: Dict
[str, str] = {}
711 section
: read_pinned_config_section(section
, conf
) for section
,
712 conf
in read_config_files(
713 args
.channels_file
).items()}
714 alias
, nonalias
= partition_dict(
715 lambda k
, v
: isinstance(v
[0], AliasSearchPath
), config
)
717 for section
, (sp
, pin
) in nonalias
.items():
718 assert not isinstance(sp
, AliasSearchPath
) # mypy can't see through
719 assert not isinstance(pin
, AliasPin
) # partition_dict()
720 tarball
= sp
.fetch(v
, pin
)
722 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
723 (pin
.release_name
, tarball
))
725 for section
, (sp
, pin
) in alias
.items():
726 assert isinstance(sp
, AliasSearchPath
) # For mypy
727 exprs
[section
] = exprs
[sp
.alias_of
]
735 '<nix/unpack-channel.nix>',
737 '--from-expression'] + [exprs
[name
] % name
for name
in sorted(exprs
.keys())]
739 print(' '.join(map(shlex
.quote
, command
)))
741 v
.status('Installing channels with nix-env')
742 process
= subprocess
.run(command
)
743 v
.result(process
.returncode
== 0)
747 parser
= argparse
.ArgumentParser(prog
='pinch')
748 subparsers
= parser
.add_subparsers(dest
='mode', required
=True)
749 parser_pin
= subparsers
.add_parser('pin')
750 parser_pin
.add_argument('channels_file', type=str)
751 parser_pin
.add_argument('channels', type=str, nargs
='*')
752 parser_pin
.set_defaults(func
=pinCommand
)
753 parser_update
= subparsers
.add_parser('update')
754 parser_update
.add_argument('--dry-run', action
='store_true')
755 parser_update
.add_argument('--profile', default
=(
756 '/nix/var/nix/profiles/per-user/%s/channels' % getpass
.getuser()))
757 parser_update
.add_argument('channels_file', type=str, nargs
='+')
758 parser_update
.set_defaults(func
=updateCommand
)
759 args
= parser
.parse_args()
763 if __name__
== '__main__':