12 import xml
.dom
.minidom
23 class InfoTableEntry(types
.SimpleNamespace
):
31 class Info(types
.SimpleNamespace
):
35 table
: Dict
[str, InfoTableEntry
]
39 class VerificationError(Exception):
45 def __init__(self
) -> None:
48 def status(self
, s
: str) -> None:
49 print(s
, end
=' ', flush
=True)
50 self
.line_length
+= 1 + len(s
) # Unicode??
53 def _color(s
: str, c
: int) -> str:
54 return '\033[%2dm%s\033[00m' % (c
, s
)
56 def result(self
, r
: bool) -> None:
57 message
, color
= {True: ('OK ', 92), False: ('FAIL', 91)}
[r
]
59 cols
= shutil
.get_terminal_size().columns
60 pad
= (cols
- (self
.line_length
+ length
)) % cols
61 print(' ' * pad
+ self
._color
(message
, color
))
64 raise VerificationError()
66 def check(self
, s
: str, r
: bool) -> None:
75 b
: str) -> Tuple
[Sequence
[str],
79 def throw(error
: OSError) -> None:
82 def join(x
: str, y
: str) -> str:
83 return y
if x
== '.' else os
.path
.join(x
, y
)
85 def recursive_files(d
: str) -> Iterable
[str]:
86 all_files
: List
[str] = []
87 for path
, dirs
, files
in os
.walk(d
, onerror
=throw
):
88 rel
= os
.path
.relpath(path
, start
=d
)
89 all_files
.extend(join(rel
, f
) for f
in files
)
90 for dir_or_link
in dirs
:
91 if os
.path
.islink(join(path
, dir_or_link
)):
92 all_files
.append(join(rel
, dir_or_link
))
95 def exclude_dot_git(files
: Iterable
[str]) -> Iterable
[str]:
96 return (f
for f
in files
if not f
.startswith('.git/'))
98 files
= functools
.reduce(
101 recursive_files(x
))) for x
in [a
, b
]))
102 return filecmp
.cmpfiles(a
, b
, files
, shallow
=False)
105 def fetch(v
: Verification
, channel_url
: str) -> Info
:
107 info
.url
= channel_url
108 v
.status('Fetching channel')
109 request
= urllib
.request
.urlopen(
110 'https://channels.nixos.org/nixos-20.03', timeout
=10)
111 info
.channel_html
= request
.read()
112 info
.forwarded_url
= request
.geturl()
113 v
.result(request
.status
== 200)
114 v
.check('Got forwarded', info
.url
!= info
.forwarded_url
)
118 def parse(v
: Verification
, info
: Info
) -> None:
119 v
.status('Parsing channel description as XML')
120 d
= xml
.dom
.minidom
.parseString(info
.channel_html
)
123 v
.status('Extracting git commit')
124 git_commit_node
= d
.getElementsByTagName('tt')[0]
125 info
.git_commit
= git_commit_node
.firstChild
.nodeValue
127 v
.status('Verifying git commit label')
128 v
.result(git_commit_node
.previousSibling
.nodeValue
== 'Git commit ')
130 v
.status('Parsing table')
132 for row
in d
.getElementsByTagName('tr')[1:]:
133 name
= row
.childNodes
[0].firstChild
.firstChild
.nodeValue
134 url
= row
.childNodes
[0].firstChild
.getAttribute('href')
135 size
= int(row
.childNodes
[1].firstChild
.nodeValue
)
136 digest
= row
.childNodes
[2].firstChild
.firstChild
.nodeValue
137 info
.table
[name
] = InfoTableEntry(url
=url
, digest
=digest
, size
=size
)
141 def fetch_resources(v
: Verification
, info
: Info
) -> None:
143 for resource
in ['git-revision', 'nixexprs.tar.xz']:
144 fields
= info
.table
[resource
]
145 v
.status('Fetching resource "%s"' % resource
)
146 url
= urllib
.parse
.urljoin(info
.forwarded_url
, fields
.url
)
147 request
= urllib
.request
.urlopen(url
, timeout
=10)
148 if fields
.size
< 4096:
149 fields
.content
= request
.read()
151 with tempfile
.NamedTemporaryFile(suffix
='.nixexprs.tar.xz', delete
=False) as tmp_file
:
152 shutil
.copyfileobj(request
, tmp_file
)
153 fields
.file = tmp_file
.name
154 v
.result(request
.status
== 200)
155 v
.status('Verifying digest for "%s"' % resource
)
156 if fields
.size
< 4096:
157 actual_hash
= hashlib
.sha256(fields
.content
).hexdigest()
159 hasher
= hashlib
.sha256()
160 with open(fields
.file, 'rb') as f
:
161 # pylint: disable=cell-var-from-loop
162 for block
in iter(lambda: f
.read(4096), b
''):
164 actual_hash
= hasher
.hexdigest()
165 v
.result(actual_hash
== fields
.digest
)
166 v
.check('Verifying git commit on main page matches git commit in table',
167 info
.table
['git-revision'].content
.decode() == info
.git_commit
)
170 def extract_channel(v
: Verification
, info
: Info
) -> None:
171 with tempfile
.TemporaryDirectory() as d
:
172 v
.status('Extracting nixexprs.tar.xz')
173 shutil
.unpack_archive(info
.table
['nixexprs.tar.xz'].file, d
)
175 v
.status('Removing temporary directory')
181 info
= fetch(v
, 'https://channels.nixos.org/nixos-20.03')
183 fetch_resources(v
, info
)
184 extract_channel(v
, info
)