]> git.scottworley.com Git - pinch/blobdiff - pinch.py
Use new 'nix hash convert' to quiet new nix's depreciation warnings
[pinch] / pinch.py
index e20d5d18367a7aef413c230b73d324598ddbaa3f..989e08fd4f6769078290f749650fb7a6811ad397 100644 (file)
--- a/pinch.py
+++ b/pinch.py
@@ -320,43 +320,24 @@ def digest_file(filename: str) -> Digest16:
     return Digest16(hasher.hexdigest())
 
 
-@functools.lru_cache
-def _experimental_flag_needed(v: Verification) -> bool:
-    v.status('Checking Nix version')
-    process = subprocess.run(['nix', '--help'], stdout=subprocess.PIPE)
-    v.result(process.returncode == 0)
-    return b'--experimental-features' in process.stdout
-
-
-def _nix_command(v: Verification) -> List[str]:
-    return ['nix', '--experimental-features',
-            'nix-command'] if _experimental_flag_needed(v) else ['nix']
+_NIX_COMMAND = ['nix', '--experimental-features', 'nix-command']
 
 
 def to_Digest16(v: Verification, digest32: Digest32) -> Digest16:
     v.status('Converting digest to base16')
-    process = subprocess.run(_nix_command(v) + [
-        'to-base16',
-        '--type',
+    process = subprocess.run(_NIX_COMMAND + [
+        'hash',
+        'convert',
+        '--hash-algo',
         'sha256',
+        '--to',
+        'base16',
         digest32],
         stdout=subprocess.PIPE)
     v.result(process.returncode == 0)
     return Digest16(process.stdout.decode().strip())
 
 
-def to_Digest32(v: Verification, digest16: Digest16) -> Digest32:
-    v.status('Converting digest to base32')
-    process = subprocess.run(_nix_command(v) + [
-        'to-base32',
-        '--type',
-        'sha256',
-        digest16],
-        stdout=subprocess.PIPE)
-    v.result(process.returncode == 0)
-    return Digest32(process.stdout.decode().strip())
-
-
 def fetch_with_nix_prefetch_url(
         v: Verification,
         url: str,
@@ -414,14 +395,24 @@ def verify_git_ancestry(
     v.result(process.returncode == 0)
 
 
+def broken_symlinks_are_identical(root1: str, root2: str, path: str) -> bool:
+    a = os.path.join(root1, path)
+    b = os.path.join(root2, path)
+    return (os.path.islink(a)
+            and os.path.islink(b)
+            and not os.path.exists(a)
+            and not os.path.exists(b)
+            and os.readlink(a) == os.readlink(b))
+
+
 def compare_tarball_and_git(
         v: Verification,
         pin: GitPin,
         channel_contents: str,
         git_contents: str) -> None:
     v.status('Comparing channel tarball with git checkout')
-    match, mismatch, errors = compare(os.path.join(
-        channel_contents, pin.release_name), git_contents)
+    tarball_contents = os.path.join(channel_contents, pin.release_name)
+    match, mismatch, errors = compare(tarball_contents, git_contents)
     v.ok()
     v.check(f'{len(match)} files match', len(match) > 0)
     v.check(f'{len(mismatch)} files differ', len(mismatch) == 0)
@@ -431,30 +422,21 @@ def compare_tarball_and_git(
         'nixpkgs',
         'programs.sqlite',
         'svn-revision']
-    permitted_errors = [
-        'pkgs/test/nixpkgs-check-by-name/tests/multiple-failures/pkgs/by-name/A/fo@/foo',
-        'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo',
-        'pkgs/test/nixpkgs-check-by-name/tests/symlink-invalid/pkgs/by-name/fo/foo/foo.nix',
-    ]
     benign_expected_errors = []
-    benign_permitted_errors = []
     for ee in expected_errors:
         if ee in errors:
             errors.remove(ee)
             benign_expected_errors.append(ee)
-    for pe in permitted_errors:
-        if pe in errors:
-            errors.remove(pe)
-            benign_permitted_errors.append(ee)
+    errors = [
+        e for e in errors
+        if not broken_symlinks_are_identical(tarball_contents, git_contents, e)
+    ]
     v.check(
         f'{len(errors)} unexpected incomparable files: {errors}',
         len(errors) == 0)
     v.check(
         f'({len(benign_expected_errors)} of {len(expected_errors)} expected incomparable files)',
         len(benign_expected_errors) == len(expected_errors))
-    v.check(
-        f'({len(benign_permitted_errors)} of {len(permitted_errors)} permitted incomparable files)',
-        len(benign_permitted_errors) <= len(permitted_errors))
 
 
 def extract_tarball(
@@ -569,7 +551,9 @@ def git_revision_name(
                               git_revision],
                              stdout=subprocess.PIPE)
     v.result(process.returncode == 0 and process.stdout != b'')
-    return f'{os.path.basename(channel.git_repo)}-{process.stdout.decode().strip()}'
+    return f'{
+        os.path.basename(channel.git_repo)}-{
+        process.stdout.decode().strip()}'
 
 
 K = TypeVar('K')
@@ -682,23 +666,35 @@ def updateCommand(args: argparse.Namespace) -> None:
         assert isinstance(sp, AliasSearchPath)  # For mypy
         exprs[section] = exprs[sp.alias_of]
 
-    command = [
-        'nix-env',
-        '--profile',
-        args.profile,
-        '--show-trace',
-        '--file',
-        '<nix/unpack-channel.nix>',
-        '--install',
-        '--remove-all',
-    ] + search_paths + ['--from-expression'] + [
-        exprs[name] % name for name in sorted(exprs.keys())]
-    if args.dry_run:
-        print(' '.join(map(shlex.quote, command)))
-    else:
-        v.status('Installing channels with nix-env')
-        process = subprocess.run(command)
-        v.result(process.returncode == 0)
+    with tempfile.NamedTemporaryFile() as unpack_channel_nix:
+        unpack_channel_nix.write(b'''
+            { name, channelName, src, }:
+            derivation {
+              inherit name channelName src;
+              builder = "builtin:unpack-channel";
+              system = "builtin";
+              preferLocalBuild = true;
+            }
+            ''')
+        unpack_channel_nix.flush()
+
+        command = [
+            'nix-env',
+            '--profile',
+            args.profile,
+            '--show-trace',
+            '--file',
+            unpack_channel_nix.name,
+            '--install',
+            '--remove-all',
+        ] + search_paths + ['--from-expression'] + [
+            exprs[name] % name for name in sorted(exprs.keys())]
+        if args.dry_run:
+            print(' '.join(map(shlex.quote, command)))
+        else:
+            v.status('Installing channels with nix-env')
+            process = subprocess.run(command)
+            v.result(process.returncode == 0)
 
 
 def main() -> None: