18 import xml
.dom
.minidom
35 # Use xdg module when it's less painful to have as a dependency
38 class XDG(NamedTuple
):
43 XDG_CACHE_HOME
=os
.getenv(
45 os
.path
.expanduser('~/.cache')))
48 class VerificationError(Exception):
54 def __init__(self
) -> None:
57 def status(self
, s
: str) -> None:
58 print(s
, end
=' ', file=sys
.stderr
, flush
=True)
59 self
.line_length
+= 1 + len(s
) # Unicode??
62 def _color(s
: str, c
: int) -> str:
63 return '\033[%2dm%s\033[00m' % (c
, s
)
65 def result(self
, r
: bool) -> None:
66 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
68 cols
= shutil
.get_terminal_size().columns
or 80
69 pad
= (cols
- (self
.line_length
+ length
)) % cols
70 print(' ' * pad
+ self
._color
(message
, color
), file=sys
.stderr
)
73 raise VerificationError()
75 def check(self
, s
: str, r
: bool) -> None:
83 Digest16
= NewType('Digest16', str)
84 Digest32
= NewType('Digest32', str)
87 class ChannelTableEntry(types
.SimpleNamespace
):
95 class AliasPin(NamedTuple
):
99 class GitPin(NamedTuple
):
104 class ChannelPin(NamedTuple
):
111 Pin
= Union
[AliasPin
, GitPin
, ChannelPin
]
114 class AliasSearchPath(NamedTuple
):
117 # pylint: disable=no-self-use
118 def pin(self
, _
: Verification
, __
: Optional
[Pin
]) -> AliasPin
:
122 class GitSearchPath(NamedTuple
):
126 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> GitPin
:
127 if old_pin
is not None:
128 assert isinstance(old_pin
, GitPin
)
129 old_revision
= old_pin
.git_revision
if old_pin
is not None else None
131 new_revision
= git_fetch(v
, self
, None, old_revision
)
132 return GitPin(release_name
=git_revision_name(v
, self
, new_revision
),
133 git_revision
=new_revision
)
135 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
136 assert isinstance(pin
, GitPin
)
137 ensure_git_rev_available(v
, self
, pin
, None)
138 return git_get_tarball(v
, self
, pin
)
141 class ChannelSearchPath(NamedTuple
):
146 def pin(self
, v
: Verification
, old_pin
: Optional
[Pin
]) -> ChannelPin
:
147 if old_pin
is not None:
148 assert isinstance(old_pin
, ChannelPin
)
149 old_revision
= old_pin
.git_revision
if old_pin
is not None else None
151 channel_html
, forwarded_url
= fetch_channel(v
, self
)
152 table
, new_gitpin
= parse_channel(v
, channel_html
)
153 fetch_resources(v
, new_gitpin
, forwarded_url
, table
)
154 ensure_git_rev_available(v
, self
, new_gitpin
, old_revision
)
155 check_channel_contents(v
, self
, table
, new_gitpin
)
157 release_name
=new_gitpin
.release_name
,
158 tarball_url
=table
['nixexprs.tar.xz'].absolute_url
,
159 tarball_sha256
=table
['nixexprs.tar.xz'].digest
,
160 git_revision
=new_gitpin
.git_revision
)
162 # pylint: disable=no-self-use
163 def fetch(self
, v
: Verification
, pin
: Pin
) -> str:
164 assert isinstance(pin
, ChannelPin
)
166 return fetch_with_nix_prefetch_url(
167 v
, pin
.tarball_url
, Digest16(pin
.tarball_sha256
))
170 SearchPath
= Union
[AliasSearchPath
, GitSearchPath
, ChannelSearchPath
]
171 TarrableSearchPath
= Union
[GitSearchPath
, ChannelSearchPath
]
174 def compare(a
: str, b
: str) -> Tuple
[List
[str], List
[str], List
[str]]:
176 def throw(error
: OSError) -> None:
179 def join(x
: str, y
: str) -> str:
180 return y
if x
== '.' else os
.path
.join(x
, y
)
182 def recursive_files(d
: str) -> Iterable
[str]:
183 all_files
: List
[str] = []
184 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
185 rel
= os
.path
.relpath(path
, start
=d
)
186 all_files
.extend(join(rel
, f
) for f
in files
)
187 for dir_or_link
in dirs
:
188 if os
.path
.islink(join(path
, dir_or_link
)):
189 all_files
.append(join(rel
, dir_or_link
))
192 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
193 return (f
for f
in files
if not f
.startswith('.git/'))
195 files
= functools
.reduce(
198 recursive_files(x
))) for x
in [a
, b
]))
199 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
203 v
: Verification
, channel
: ChannelSearchPath
) -> Tuple
[str, str]:
204 v
.status('Fetching channel')
205 request
= urllib
.request
.urlopen(channel
.channel_url
, timeout
=10)
206 channel_html
= request
.read()
207 forwarded_url
= request
.geturl()
208 v
.result(request
.status
== 200) # type: ignore # (for old mypy)
209 v
.check('Got forwarded', channel
.channel_url
!= forwarded_url
)
210 return channel_html
, forwarded_url
213 def parse_channel(v
: Verification
, channel_html
: str) \
214 -> Tuple
[Dict
[str, ChannelTableEntry
], GitPin
]:
215 v
.status('Parsing channel description as XML')
216 d
= xml
.dom
.minidom
.parseString(channel_html
)
219 v
.status('Extracting release name:')
220 title_name
= d
.getElementsByTagName(
221 'title')[0].firstChild
.nodeValue
.split()[2]
222 h1_name
= d
.getElementsByTagName('h1')[0].firstChild
.nodeValue
.split()[2]
224 v
.result(title_name
== h1_name
)
226 v
.status('Extracting git commit:')
227 git_commit_node
= d
.getElementsByTagName('tt')[0]
228 git_revision
= git_commit_node
.firstChild
.nodeValue
229 v
.status(git_revision
)
231 v
.status('Verifying git commit label')
232 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
234 v
.status('Parsing table')
235 table
: Dict
[str, ChannelTableEntry
] = {}
236 for row
in d
.getElementsByTagName('tr')[1:]:
237 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
238 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
239 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
240 digest
= Digest16(row
.childNodes
[2].firstChild
.firstChild
.nodeValue
)
241 table
[name
] = ChannelTableEntry(url
=url
, digest
=digest
, size
=size
)
243 return table
, GitPin(release_name
=title_name
, git_revision
=git_revision
)
246 def digest_string(s
: bytes) -> Digest16
:
247 return Digest16(hashlib
.sha256(s
).hexdigest())
250 def digest_file(filename
: str) -> Digest16
:
251 hasher
= hashlib
.sha256()
252 with open(filename
, 'rb') as f
:
253 # pylint: disable=cell-var-from-loop
254 for block
in iter(lambda: f
.read(4096), b
''):
256 return Digest16(hasher
.hexdigest())
259 def to_Digest16(v
: Verification
, digest32
: Digest32
) -> Digest16
:
260 v
.status('Converting digest to base16')
261 process
= subprocess
.run(
262 ['nix', 'to-base16', '--type', 'sha256', digest32
], stdout
=subprocess
.PIPE
)
263 v
.result(process
.returncode
== 0)
264 return Digest16(process
.stdout
.decode().strip())
267 def to_Digest32(v
: Verification
, digest16
: Digest16
) -> Digest32
:
268 v
.status('Converting digest to base32')
269 process
= subprocess
.run(
270 ['nix', 'to-base32', '--type', 'sha256', digest16
], stdout
=subprocess
.PIPE
)
271 v
.result(process
.returncode
== 0)
272 return Digest32(process
.stdout
.decode().strip())
275 def fetch_with_nix_prefetch_url(
278 digest
: Digest16
) -> str:
279 v
.status('Fetching %s' % url
)
280 process
= subprocess
.run(
281 ['nix-prefetch-url', '--print-path', url
, digest
], stdout
=subprocess
.PIPE
)
282 v
.result(process
.returncode
== 0)
283 prefetch_digest
, path
, empty
= process
.stdout
.decode().split('\n')
285 v
.check("Verifying nix-prefetch-url's digest",
286 to_Digest16(v
, Digest32(prefetch_digest
)) == digest
)
287 v
.status("Verifying file digest")
288 file_digest
= digest_file(path
)
289 v
.result(file_digest
== digest
)
290 return path
# type: ignore # (for old mypy)
297 table
: Dict
[str, ChannelTableEntry
]) -> None:
298 for resource
in ['git-revision', 'nixexprs.tar.xz']:
299 fields
= table
[resource
]
300 fields
.absolute_url
= urllib
.parse
.urljoin(forwarded_url
, fields
.url
)
301 fields
.file = fetch_with_nix_prefetch_url(
302 v
, fields
.absolute_url
, fields
.digest
)
303 v
.status('Verifying git commit on main page matches git commit in table')
304 v
.result(open(table
['git-revision'].file).read(999) == pin
.git_revision
)
307 def git_cachedir(git_repo
: str) -> str:
311 digest_string(git_repo
.encode()))
314 def tarball_cache_file(channel
: TarrableSearchPath
, pin
: GitPin
) -> str:
319 (digest_string(channel
.git_repo
.encode()),
324 def verify_git_ancestry(
326 channel
: TarrableSearchPath
,
328 old_revision
: Optional
[str]) -> None:
329 cachedir
= git_cachedir(channel
.git_repo
)
330 v
.status('Verifying rev is an ancestor of ref')
331 process
= subprocess
.run(['git',
338 v
.result(process
.returncode
== 0)
340 if old_revision
is not None:
342 'Verifying rev is an ancestor of previous rev %s' %
344 process
= subprocess
.run(['git',
351 v
.result(process
.returncode
== 0)
356 channel
: TarrableSearchPath
,
357 desired_revision
: Optional
[str],
358 old_revision
: Optional
[str]) -> str:
359 # It would be nice if we could share the nix git cache, but as of the time
360 # of writing it is transitioning from gitv2 (deprecated) to gitv3 (not ready
361 # yet), and trying to straddle them both is too far into nix implementation
362 # details for my comfort. So we re-implement here half of nix.fetchGit.
365 cachedir
= git_cachedir(channel
.git_repo
)
366 if not os
.path
.exists(cachedir
):
367 v
.status("Initializing git repo")
368 process
= subprocess
.run(
369 ['git', 'init', '--bare', cachedir
])
370 v
.result(process
.returncode
== 0)
372 v
.status('Fetching ref "%s" from %s' % (channel
.git_ref
, channel
.git_repo
))
373 # We don't use --force here because we want to abort and freak out if forced
374 # updates are happening.
375 process
= subprocess
.run(['git',
380 '%s:%s' % (channel
.git_ref
,
382 v
.result(process
.returncode
== 0)
384 if desired_revision
is not None:
385 v
.status('Verifying that fetch retrieved this rev')
386 process
= subprocess
.run(
387 ['git', '-C', cachedir
, 'cat-file', '-e', desired_revision
])
388 v
.result(process
.returncode
== 0)
395 channel
.git_ref
)).read(999).strip()
397 verify_git_ancestry(v
, channel
, new_revision
, old_revision
)
402 def ensure_git_rev_available(
404 channel
: TarrableSearchPath
,
406 old_revision
: Optional
[str]) -> None:
407 cachedir
= git_cachedir(channel
.git_repo
)
408 if os
.path
.exists(cachedir
):
409 v
.status('Checking if we already have this rev:')
410 process
= subprocess
.run(
411 ['git', '-C', cachedir
, 'cat-file', '-e', pin
.git_revision
])
412 if process
.returncode
== 0:
414 if process
.returncode
== 1:
416 v
.result(process
.returncode
== 0 or process
.returncode
== 1)
417 if process
.returncode
== 0:
418 verify_git_ancestry(v
, channel
, pin
.git_revision
, old_revision
)
420 git_fetch(v
, channel
, pin
.git_revision
, old_revision
)
423 def compare_tarball_and_git(
426 channel_contents
: str,
427 git_contents
: str) -> None:
428 v
.status('Comparing channel tarball with git checkout')
429 match
, mismatch
, errors
= compare(os
.path
.join(
430 channel_contents
, pin
.release_name
), git_contents
)
432 v
.check('%d files match' % len(match
), len(match
) > 0)
433 v
.check('%d files differ' % len(mismatch
), len(mismatch
) == 0)
441 for ee
in expected_errors
:
444 benign_errors
.append(ee
)
446 '%d unexpected incomparable files' %
450 '(%d of %d expected incomparable files)' %
452 len(expected_errors
)),
453 len(benign_errors
) == len(expected_errors
))
458 table
: Dict
[str, ChannelTableEntry
],
460 v
.status('Extracting tarball %s' % table
['nixexprs.tar.xz'].file)
461 shutil
.unpack_archive(table
['nixexprs.tar.xz'].file, dest
)
467 channel
: TarrableSearchPath
,
470 v
.status('Checking out corresponding git revision')
471 git
= subprocess
.Popen(['git',
473 git_cachedir(channel
.git_repo
),
476 stdout
=subprocess
.PIPE
)
477 tar
= subprocess
.Popen(
478 ['tar', 'x', '-C', dest
, '-f', '-'], stdin
=git
.stdout
)
483 v
.result(git
.returncode
== 0 and tar
.returncode
== 0)
488 channel
: TarrableSearchPath
,
490 cache_file
= tarball_cache_file(channel
, pin
)
491 if os
.path
.exists(cache_file
):
492 cached_tarball
= open(cache_file
).read(9999)
493 if os
.path
.exists(cached_tarball
):
494 return cached_tarball
496 with tempfile
.TemporaryDirectory() as output_dir
:
497 output_filename
= os
.path
.join(
498 output_dir
, pin
.release_name
+ '.tar.xz')
499 with open(output_filename
, 'w') as output_file
:
501 'Generating tarball for git revision %s' %
503 git
= subprocess
.Popen(['git',
505 git_cachedir(channel
.git_repo
),
507 '--prefix=%s/' % pin
.release_name
,
509 stdout
=subprocess
.PIPE
)
510 xz
= subprocess
.Popen(['xz'], stdin
=git
.stdout
, stdout
=output_file
)
513 v
.result(git
.returncode
== 0 and xz
.returncode
== 0)
515 v
.status('Putting tarball in Nix store')
516 process
= subprocess
.run(
517 ['nix-store', '--add', output_filename
], stdout
=subprocess
.PIPE
)
518 v
.result(process
.returncode
== 0)
519 store_tarball
= process
.stdout
.decode().strip()
521 os
.makedirs(os
.path
.dirname(cache_file
), exist_ok
=True)
522 open(cache_file
, 'w').write(store_tarball
)
523 return store_tarball
# type: ignore # (for old mypy)
526 def check_channel_metadata(
529 channel_contents
: str) -> None:
530 v
.status('Verifying git commit in channel tarball')
536 '.git-revision')).read(999) == pin
.git_revision
)
539 'Verifying version-suffix is a suffix of release name %s:' %
541 version_suffix
= open(
545 '.version-suffix')).read(999)
546 v
.status(version_suffix
)
547 v
.result(pin
.release_name
.endswith(version_suffix
))
550 def check_channel_contents(
552 channel
: TarrableSearchPath
,
553 table
: Dict
[str, ChannelTableEntry
],
554 pin
: GitPin
) -> None:
555 with tempfile
.TemporaryDirectory() as channel_contents
, \
556 tempfile
.TemporaryDirectory() as git_contents
:
558 extract_tarball(v
, table
, channel_contents
)
559 check_channel_metadata(v
, pin
, channel_contents
)
561 git_checkout(v
, channel
, pin
, git_contents
)
563 compare_tarball_and_git(v
, pin
, channel_contents
, git_contents
)
565 v
.status('Removing temporary directories')
569 def git_revision_name(
571 channel
: TarrableSearchPath
,
572 git_revision
: str) -> str:
573 v
.status('Getting commit date')
574 process
= subprocess
.run(['git',
576 git_cachedir(channel
.git_repo
),
581 '--no-show-signature',
583 stdout
=subprocess
.PIPE
)
584 v
.result(process
.returncode
== 0 and process
.stdout
!= b
'')
585 return '%s-%s' % (os
.path
.basename(channel
.git_repo
),
586 process
.stdout
.decode().strip())
593 def filter_dict(d
: Dict
[K
, V
], fields
: Set
[K
]
594 ) -> Tuple
[Dict
[K
, V
], Dict
[K
, V
]]:
595 selected
: Dict
[K
, V
] = {}
596 remaining
: Dict
[K
, V
] = {}
597 for k
, v
in d
.items():
602 return selected
, remaining
605 def read_search_path(
606 conf
: configparser
.SectionProxy
) -> Tuple
[SearchPath
, Optional
[Pin
]]:
607 mapping
: Mapping
[str, Tuple
[Type
[SearchPath
], Type
[Pin
]]] = {
608 'alias': (AliasSearchPath
, AliasPin
),
609 'channel': (ChannelSearchPath
, ChannelPin
),
610 'git': (GitSearchPath
, GitPin
),
612 SP
, P
= mapping
[conf
['type']]
613 _
, all_fields
= filter_dict(dict(conf
.items()), set(['type']))
614 pin_fields
, remaining_fields
= filter_dict(all_fields
, set(P
._fields
))
615 # Error suppression works around https://github.com/python/mypy/issues/9007
616 pin_present
= pin_fields
!= {} or P
._fields
== ()
617 pin
= P(**pin_fields
) if pin_present
else None # type:ignore[call-arg]
618 return SP(**remaining_fields
), pin
621 def read_config(filename
: str) -> configparser
.ConfigParser
:
622 config
= configparser
.ConfigParser()
623 config
.read_file(open(filename
), filename
)
627 def read_config_files(
628 filenames
: Iterable
[str]) -> Dict
[str, configparser
.SectionProxy
]:
629 merged_config
: Dict
[str, configparser
.SectionProxy
] = {}
630 for file in filenames
:
631 config
= read_config(file)
632 for section
in config
.sections():
633 if section
in merged_config
:
634 raise Exception('Duplicate channel "%s"' % section
)
635 merged_config
[section
] = config
[section
]
639 def pinCommand(args
: argparse
.Namespace
) -> None:
641 config
= read_config(args
.channels_file
)
642 for section
in config
.sections():
643 if args
.channels
and section
not in args
.channels
:
646 sp
, old_pin
= read_search_path(config
[section
])
648 config
[section
].update(sp
.pin(v
, old_pin
)._asdict
())
650 with open(args
.channels_file
, 'w') as configfile
:
651 config
.write(configfile
)
654 def updateCommand(args
: argparse
.Namespace
) -> None:
656 exprs
: Dict
[str, str] = {}
657 config
= read_config_files(args
.channels_file
)
658 for section
in config
:
659 sp
, pin
= read_search_path(config
[section
])
662 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
664 if isinstance(sp
, AliasSearchPath
):
666 tarball
= sp
.fetch(v
, pin
)
668 'f: f { name = "%s"; channelName = "%%s"; src = builtins.storePath "%s"; }' %
669 (config
[section
]['release_name'], tarball
))
671 for section
in config
:
672 if 'alias_of' in config
[section
]:
673 exprs
[section
] = exprs
[str(config
[section
]['alias_of'])]
678 '/nix/var/nix/profiles/per-user/%s/channels' %
682 '<nix/unpack-channel.nix>',
684 '--from-expression'] + [exprs
[name
] % name
for name
in sorted(exprs
.keys())]
686 print(' '.join(map(shlex
.quote
, command
)))
688 v
.status('Installing channels with nix-env')
689 process
= subprocess
.run(command
)
690 v
.result(process
.returncode
== 0)
694 parser
= argparse
.ArgumentParser(prog
='pinch')
695 subparsers
= parser
.add_subparsers(dest
='mode', required
=True)
696 parser_pin
= subparsers
.add_parser('pin')
697 parser_pin
.add_argument('channels_file', type=str)
698 parser_pin
.add_argument('channels', type=str, nargs
='*')
699 parser_pin
.set_defaults(func
=pinCommand
)
700 parser_update
= subparsers
.add_parser('update')
701 parser_update
.add_argument('--dry-run', action
='store_true')
702 parser_update
.add_argument('channels_file', type=str, nargs
='+')
703 parser_update
.set_defaults(func
=updateCommand
)
704 args
= parser
.parse_args()
708 if __name__
== '__main__':