1 from abc
import ABC
, abstractmethod
19 import xml
.dom
.minidom
34 # Use xdg module when it's less painful to have as a dependency
37 class XDG(types
.SimpleNamespace
):
42 XDG_CACHE_HOME
=os
.getenv(
44 os
.path
.expanduser('~/.cache')))
47 class VerificationError(Exception):
53 def __init__(self
) -> None:
56 def status(self
, s
: str) -> None:
57 print(s
, end
=' ', file=sys
.stderr
, flush
=True)
58 self
.line_length
+= 1 + len(s
) # Unicode??
61 def _color(s
: str, c
: int) -> str:
62 return '\033[%2dm%s\033[00m' % (c
, s
)
64 def result(self
, r
: bool) -> None:
65 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
67 cols
= shutil
.get_terminal_size().columns
or 80
68 pad
= (cols
- (self
.line_length
+ length
)) % cols
69 print(' ' * pad
+ self
._color
(message
, color
), file=sys
.stderr
)
72 raise VerificationError()
74 def check(self
, s
: str, r
: bool) -> None:
82 Digest16
= NewType('Digest16', str)
83 Digest32
= NewType('Digest32', str)
86 class ChannelTableEntry(types
.SimpleNamespace
):
94 class AliasPin(NamedTuple
):
98 class GitPin(NamedTuple
):
103 class ChannelPin(NamedTuple
):
110 Pin
= Union
[AliasPin
, GitPin
, ChannelPin
]
113 class SearchPath(types
.SimpleNamespace
, ABC
):
116 def pin(self
, v
: Verification
) -> Pin
:
120 class AliasSearchPath(SearchPath
):
123 def pin(self
, v
: Verification
) -> AliasPin
:
124 assert not hasattr(self
, 'git_repo')
128 # (This lint-disable is for pylint bug https://github.com/PyCQA/pylint/issues/179
129 # which is fixed in pylint 2.5.)
130 class TarrableSearchPath(SearchPath
, ABC
): # pylint: disable=abstract-method
136 table
: Dict
[str, ChannelTableEntry
]
139 class GitSearchPath(TarrableSearchPath
):
140 def pin(self
, v
: Verification
) -> GitPin
:
142 self
.git_revision
if hasattr(self
, 'git_revision') else None)
143 if hasattr(self
, 'git_revision'):
144 del self
.git_revision
146 new_revision
= git_fetch(v
, self
, None, old_revision
)
147 return GitPin(release_name
=git_revision_name(v
, self
, new_revision
),
148 git_revision
=new_revision
)
150 def fetch(self
, v
: Verification
, section
: str,
151 conf
: configparser
.SectionProxy
) -> str:
152 if 'git_revision' not in conf
or 'release_name' not in conf
:
154 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
157 release_name
=conf
['release_name'],
158 git_revision
=conf
['git_revision'])
160 ensure_git_rev_available(v
, self
, the_pin
, None)
161 return git_get_tarball(v
, self
, the_pin
)
164 class ChannelSearchPath(TarrableSearchPath
):
165 def pin(self
, v
: Verification
) -> ChannelPin
:
167 self
.git_revision
if hasattr(self
, 'git_revision') else None)
168 if hasattr(self
, 'git_revision'):
169 del self
.git_revision
172 new_gitpin
= parse_channel(v
, self
)
173 fetch_resources(v
, self
, new_gitpin
)
174 ensure_git_rev_available(v
, self
, new_gitpin
, old_revision
)
175 check_channel_contents(v
, self
, new_gitpin
)
177 release_name
=new_gitpin
.release_name
,
178 tarball_url
=self
.table
['nixexprs.tar.xz'].absolute_url
,
179 tarball_sha256
=self
.table
['nixexprs.tar.xz'].digest
,
180 git_revision
=self
.git_revision
)
182 # Lint TODO: Put tarball_url and tarball_sha256 in ChannelSearchPath
183 # pylint: disable=no-self-use
184 def fetch(self
, v
: Verification
, section
: str,
185 conf
: configparser
.SectionProxy
) -> str:
186 if 'git_repo' not in conf
or 'release_name' not in conf
:
188 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
191 return fetch_with_nix_prefetch_url(
192 v
, conf
['tarball_url'], Digest16(
193 conf
['tarball_sha256']))
196 def compare(a
: str, b
: str) -> Tuple
[List
[str], List
[str], List
[str]]:
198 def throw(error
: OSError) -> None:
201 def join(x
: str, y
: str) -> str:
202 return y
if x
== '.' else os
.path
.join(x
, y
)
204 def recursive_files(d
: str) -> Iterable
[str]:
205 all_files
: List
[str] = []
206 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
207 rel
= os
.path
.relpath(path
, start
=d
)
208 all_files
.extend(join(rel
, f
) for f
in files
)
209 for dir_or_link
in dirs
:
210 if os
.path
.islink(join(path
, dir_or_link
)):
211 all_files
.append(join(rel
, dir_or_link
))
214 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
215 return (f
for f
in files
if not f
.startswith('.git/'))
217 files
= functools
.reduce(
220 recursive_files(x
))) for x
in [a
, b
]))
221 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
224 def fetch(v
: Verification
, channel
: TarrableSearchPath
) -> None:
225 v
.status('Fetching channel')
226 request
= urllib
.request
.urlopen(channel
.channel_url
, timeout
=10)
227 channel
.channel_html
= request
.read()
228 channel
.forwarded_url
= request
.geturl()
229 v
.result(request
.status
== 200) # type: ignore # (for old mypy)
230 v
.check('Got forwarded', channel
.channel_url
!= channel
.forwarded_url
)
233 def parse_channel(v
: Verification
, channel
: TarrableSearchPath
) -> GitPin
:
234 v
.status('Parsing channel description as XML')
235 d
= xml
.dom
.minidom
.parseString(channel
.channel_html
)
238 v
.status('Extracting release name:')
239 title_name
= d
.getElementsByTagName(
240 'title')[0].firstChild
.nodeValue
.split()[2]
241 h1_name
= d
.getElementsByTagName('h1')[0].firstChild
.nodeValue
.split()[2]
243 v
.result(title_name
== h1_name
)
245 v
.status('Extracting git commit:')
246 git_commit_node
= d
.getElementsByTagName('tt')[0]
247 channel
.git_revision
= git_commit_node
.firstChild
.nodeValue
248 v
.status(channel
.git_revision
)
250 v
.status('Verifying git commit label')
251 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
253 v
.status('Parsing table')
255 for row
in d
.getElementsByTagName('tr')[1:]:
256 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
257 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
258 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
259 digest
= Digest16(row
.childNodes
[2].firstChild
.firstChild
.nodeValue
)
260 channel
.table
[name
] = ChannelTableEntry(
261 url
=url
, digest
=digest
, size
=size
)
263 return GitPin(release_name
=title_name
, git_revision
=channel
.git_revision
)
266 def digest_string(s
: bytes) -> Digest16
:
267 return Digest16(hashlib
.sha256(s
).hexdigest())
270 def digest_file(filename
: str) -> Digest16
:
271 hasher
= hashlib
.sha256()
272 with open(filename
, 'rb') as f
:
273 # pylint: disable=cell-var-from-loop
274 for block
in iter(lambda: f
.read(4096), b
''):
276 return Digest16(hasher
.hexdigest())
279 def to_Digest16(v
: Verification
, digest32
: Digest32
) -> Digest16
:
280 v
.status('Converting digest to base16')
281 process
= subprocess
.run(
282 ['nix', 'to-base16', '--type', 'sha256', digest32
], stdout
=subprocess
.PIPE
)
283 v
.result(process
.returncode
== 0)
284 return Digest16(process
.stdout
.decode().strip())
287 def to_Digest32(v
: Verification
, digest16
: Digest16
) -> Digest32
:
288 v
.status('Converting digest to base32')
289 process
= subprocess
.run(
290 ['nix', 'to-base32', '--type', 'sha256', digest16
], stdout
=subprocess
.PIPE
)
291 v
.result(process
.returncode
== 0)
292 return Digest32(process
.stdout
.decode().strip())
295 def fetch_with_nix_prefetch_url(
298 digest
: Digest16
) -> str:
299 v
.status('Fetching %s' % url
)
300 process
= subprocess
.run(
301 ['nix-prefetch-url', '--print-path', url
, digest
], stdout
=subprocess
.PIPE
)
302 v
.result(process
.returncode
== 0)
303 prefetch_digest
, path
, empty
= process
.stdout
.decode().split('\n')
305 v
.check("Verifying nix-prefetch-url's digest",
306 to_Digest16(v
, Digest32(prefetch_digest
)) == digest
)
307 v
.status("Verifying file digest")
308 file_digest
= digest_file(path
)
309 v
.result(file_digest
== digest
)
310 return path
# type: ignore # (for old mypy)
315 channel
: ChannelSearchPath
,
316 pin
: GitPin
) -> None:
317 for resource
in ['git-revision', 'nixexprs.tar.xz']:
318 fields
= channel
.table
[resource
]
319 fields
.absolute_url
= urllib
.parse
.urljoin(
320 channel
.forwarded_url
, fields
.url
)
321 fields
.file = fetch_with_nix_prefetch_url(
322 v
, fields
.absolute_url
, fields
.digest
)
323 v
.status('Verifying git commit on main page matches git commit in table')
326 channel
.table
['git-revision'].file).read(999) == pin
.git_revision
)
329 def git_cachedir(git_repo
: str) -> str:
333 digest_string(git_repo
.encode()))
336 def tarball_cache_file(channel
: TarrableSearchPath
, pin
: GitPin
) -> str:
341 (digest_string(channel
.git_repo
.encode()),
346 def verify_git_ancestry(
348 channel
: TarrableSearchPath
,
350 old_revision
: Optional
[str]) -> None:
351 cachedir
= git_cachedir(channel
.git_repo
)
352 v
.status('Verifying rev is an ancestor of ref')
353 process
= subprocess
.run(['git',
360 v
.result(process
.returncode
== 0)
362 if old_revision
is not None:
364 'Verifying rev is an ancestor of previous rev %s' %
366 process
= subprocess
.run(['git',
373 v
.result(process
.returncode
== 0)
378 channel
: TarrableSearchPath
,
379 desired_revision
: Optional
[str],
380 old_revision
: Optional
[str]) -> str:
381 # It would be nice if we could share the nix git cache, but as of the time
382 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
383 # yet), and trying to straddle them both is too far into nix implementation
384 # details for my comfort. So we re-implement here half of nix.fetchGit.
387 cachedir
= git_cachedir(channel
.git_repo
)
388 if not os
.path
.exists(cachedir
):
389 v
.status("Initializing git repo")
390 process
= subprocess
.run(
391 ['git', 'init', '--bare', cachedir
])
392 v
.result(process
.returncode
== 0)
394 v
.status('Fetching ref "%s" from %s' % (channel
.git_ref
, channel
.git_repo
))
395 # We don't use --force here because we want to abort and freak out if forced
396 # updates are happening.
397 process
= subprocess
.run(['git',
402 '%s:%s' % (channel
.git_ref
,
404 v
.result(process
.returncode
== 0)
406 if desired_revision
is not None:
407 v
.status('Verifying that fetch retrieved this rev')
408 process
= subprocess
.run(
409 ['git', '-C', cachedir
, 'cat-file', '-e', desired_revision
])
410 v
.result(process
.returncode
== 0)
417 channel
.git_ref
)).read(999).strip()
419 verify_git_ancestry(v
, channel
, new_revision
, old_revision
)
424 def ensure_git_rev_available(
426 channel
: TarrableSearchPath
,
428 old_revision
: Optional
[str]) -> None:
429 cachedir
= git_cachedir(channel
.git_repo
)
430 if os
.path
.exists(cachedir
):
431 v
.status('Checking if we already have this rev:')
432 process
= subprocess
.run(
433 ['git', '-C', cachedir
, 'cat-file', '-e', pin
.git_revision
])
434 if process
.returncode
== 0:
436 if process
.returncode
== 1:
438 v
.result(process
.returncode
== 0 or process
.returncode
== 1)
439 if process
.returncode
== 0:
440 verify_git_ancestry(v
, channel
, pin
.git_revision
, old_revision
)
442 git_fetch(v
, channel
, pin
.git_revision
, old_revision
)
445 def compare_tarball_and_git(
448 channel_contents
: str,
449 git_contents
: str) -> None:
450 v
.status('Comparing channel tarball with git checkout')
451 match
, mismatch
, errors
= compare(os
.path
.join(
452 channel_contents
, pin
.release_name
), git_contents
)
454 v
.check('%d files match' % len(match
), len(match
) > 0)
455 v
.check('%d files differ' % len(mismatch
), len(mismatch
) == 0)
463 for ee
in expected_errors
:
466 benign_errors
.append(ee
)
468 '%d unexpected incomparable files' %
472 '(%d of %d expected incomparable files)' %
474 len(expected_errors
)),
475 len(benign_errors
) == len(expected_errors
))
480 channel
: TarrableSearchPath
,
482 v
.status('Extracting tarball %s' %
483 channel
.table
['nixexprs.tar.xz'].file)
484 shutil
.unpack_archive(
485 channel
.table
['nixexprs.tar.xz'].file,
492 channel
: TarrableSearchPath
,
494 v
.status('Checking out corresponding git revision')
495 git
= subprocess
.Popen(['git',
497 git_cachedir(channel
.git_repo
),
499 channel
.git_revision
],
500 stdout
=subprocess
.PIPE
)
501 tar
= subprocess
.Popen(
502 ['tar', 'x', '-C', dest
, '-f', '-'], stdin
=git
.stdout
)
507 v
.result(git
.returncode
== 0 and tar
.returncode
== 0)
512 channel
: TarrableSearchPath
,
514 cache_file
= tarball_cache_file(channel
, pin
)
515 if os
.path
.exists(cache_file
):
516 cached_tarball
= open(cache_file
).read(9999)
517 if os
.path
.exists(cached_tarball
):
518 return cached_tarball
520 with tempfile
.TemporaryDirectory() as output_dir
:
521 output_filename
= os
.path
.join(
522 output_dir
, pin
.release_name
+ '.tar.xz')
523 with open(output_filename
, 'w') as output_file
:
525 'Generating tarball for git revision %s' %
527 git
= subprocess
.Popen(['git',
529 git_cachedir(channel
.git_repo
),
531 '--prefix=%s/' % pin
.release_name
,
533 stdout
=subprocess
.PIPE
)
534 xz
= subprocess
.Popen(['xz'], stdin
=git
.stdout
, stdout
=output_file
)
537 v
.result(git
.returncode
== 0 and xz
.returncode
== 0)
539 v
.status('Putting tarball in Nix store')
540 process
= subprocess
.run(
541 ['nix-store', '--add', output_filename
], stdout
=subprocess
.PIPE
)
542 v
.result(process
.returncode
== 0)
543 store_tarball
= process
.stdout
.decode().strip()
545 os
.makedirs(os
.path
.dirname(cache_file
), exist_ok
=True)
546 open(cache_file
, 'w').write(store_tarball
)
547 return store_tarball
# type: ignore # (for old mypy)
550 def check_channel_metadata(
553 channel_contents
: str) -> None:
554 v
.status('Verifying git commit in channel tarball')
560 '.git-revision')).read(999) == pin
.git_revision
)
563 'Verifying version-suffix is a suffix of release name %s:' %
565 version_suffix
= open(
569 '.version-suffix')).read(999)
570 v
.status(version_suffix
)
571 v
.result(pin
.release_name
.endswith(version_suffix
))
574 def check_channel_contents(
576 channel
: TarrableSearchPath
,
577 pin
: GitPin
) -> None:
578 with tempfile
.TemporaryDirectory() as channel_contents
, \
579 tempfile
.TemporaryDirectory() as git_contents
:
581 extract_tarball(v
, channel
, channel_contents
)
582 check_channel_metadata(v
, pin
, channel_contents
)
584 git_checkout(v
, channel
, git_contents
)
586 compare_tarball_and_git(v
, pin
, channel_contents
, git_contents
)
588 v
.status('Removing temporary directories')
592 def git_revision_name(
594 channel
: TarrableSearchPath
,
595 git_revision
: str) -> str:
596 v
.status('Getting commit date')
597 process
= subprocess
.run(['git',
599 git_cachedir(channel
.git_repo
),
604 '--no-show-signature',
606 stdout
=subprocess
.PIPE
)
607 v
.result(process
.returncode
== 0 and process
.stdout
!= b
'')
608 return '%s-%s' % (os
.path
.basename(channel
.git_repo
),
609 process
.stdout
.decode().strip())
612 def read_search_path(conf
: configparser
.SectionProxy
) -> SearchPath
:
613 mapping
: Mapping
[str, Type
[SearchPath
]] = {
614 'alias': AliasSearchPath
,
615 'channel': ChannelSearchPath
,
616 'git': GitSearchPath
,
618 return mapping
[conf
['type']](**dict(conf
.items()))
621 def read_config(filename
: str) -> configparser
.ConfigParser
:
622 config
= configparser
.ConfigParser()
623 config
.read_file(open(filename
), filename
)
627 def read_config_files(
628 filenames
: Iterable
[str]) -> Dict
[str, configparser
.SectionProxy
]:
629 merged_config
: Dict
[str, configparser
.SectionProxy
] = {}
630 for file in filenames
:
631 config
= read_config(file)
632 for section
in config
.sections():
633 if section
in merged_config
:
634 raise Exception('Duplicate channel "%s"' % section
)
635 merged_config
[section
] = config
[section
]
639 def pinCommand(args
: argparse
.Namespace
) -> None:
641 config
= read_config(args
.channels_file
)
642 for section
in config
.sections():
643 if args
.channels
and section
not in args
.channels
:
646 sp
= read_search_path(config
[section
])
648 config
[section
].update(sp
.pin(v
)._asdict
())
650 with open(args
.channels_file
, 'w') as configfile
:
651 config
.write(configfile
)
654 def updateCommand(args
: argparse
.Namespace
) -> None:
656 exprs
: Dict
[str, str] = {}
657 config
= read_config_files(args
.channels_file
)
658 for section
in config
:
659 sp
= read_search_path(config
[section
])
660 if isinstance(sp
, AliasSearchPath
):
661 assert 'git_repo' not in config
[section
]
663 tarball
= sp
.fetch(v
, section
, config
[section
])
665 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
666 (config
[section
]['release_name'], tarball
))
668 for section
in config
:
669 if 'alias_of' in config
[section
]:
670 exprs
[section
] = exprs
[str(config
[section
]['alias_of'])]
675 '/nix/var/nix/profiles/per-user/%s/channels' %
679 '<nix/unpack-channel.nix>',
681 '--from-expression'] + [exprs
[name
] % name
for name
in sorted(exprs
.keys())]
683 print(' '.join(map(shlex
.quote
, command
)))
685 v
.status('Installing channels with nix-env')
686 process
= subprocess
.run(command
)
687 v
.result(process
.returncode
== 0)
691 parser
= argparse
.ArgumentParser(prog
='pinch')
692 subparsers
= parser
.add_subparsers(dest
='mode', required
=True)
693 parser_pin
= subparsers
.add_parser('pin')
694 parser_pin
.add_argument('channels_file', type=str)
695 parser_pin
.add_argument('channels', type=str, nargs
='*')
696 parser_pin
.set_defaults(func
=pinCommand
)
697 parser_update
= subparsers
.add_parser('update')
698 parser_update
.add_argument('--dry-run', action
='store_true')
699 parser_update
.add_argument('channels_file', type=str, nargs
='+')
700 parser_update
.set_defaults(func
=updateCommand
)
701 args
= parser
.parse_args()
705 if __name__
== '__main__':