]>
Commit | Line | Data |
---|---|---|
1e40791b SW |
1 | from contextlib import contextmanager |
2 | import json | |
3 | import os | |
4 | import subprocess | |
5 | import sys | |
6 | import xml.sax | |
7 | import xml.sax.handler | |
8 | ||
58265043 SW |
9 | from typing import Any, Dict, Iterator, List, Set |
10 | ||
1e40791b SW |
11 | |
12 | @contextmanager | |
58265043 | 13 | def log(msg: str) -> Iterator[None]: |
1e40791b SW |
14 | print(msg, file=sys.stderr, end='', flush=True) |
15 | try: | |
16 | yield | |
17 | finally: | |
18 | print('\r', file=sys.stderr, end='', flush=True) | |
19 | ||
20 | ||
21 | class ParseNixStoreQueryGraphML(xml.sax.handler.ContentHandler): | |
58265043 | 22 | def __init__(self) -> None: |
07fdf185 | 23 | super().__init__() |
58265043 SW |
24 | self.roots: Set[str] = set() |
25 | self.non_roots: Set[str] = set() | |
26 | self.deps: Dict[str, List[str]] = {} | |
1e40791b | 27 | |
58265043 | 28 | def startElement(self, name: str, attrs: Any) -> None: |
1e40791b SW |
29 | if name == "edge": |
30 | source = attrs.getValue("source") | |
31 | target = attrs.getValue("target") | |
32 | self.non_roots.add(target) | |
33 | self.deps.setdefault(source, []).append(target) | |
34 | if target in self.roots: | |
35 | self.roots.remove(target) | |
36 | if source not in self.non_roots: | |
37 | self.roots.add(source) | |
38 | ||
39 | ||
58265043 | 40 | def getDeps(drv: str) -> ParseNixStoreQueryGraphML: |
1e40791b | 41 | with log("Loading dependency tree..."): |
07fdf185 SW |
42 | with subprocess.Popen( |
43 | ["nix-store", "--query", "--graphml", drv], stdout=subprocess.PIPE) as process: | |
44 | parser = ParseNixStoreQueryGraphML() | |
45 | assert process.stdout | |
46 | xml.sax.parse(process.stdout, parser) | |
47 | assert process.wait() == 0 | |
1e40791b SW |
48 | return parser |
49 | ||
50 | ||
58265043 | 51 | def getDrvInfo(drv: str) -> Any: |
07fdf185 SW |
52 | with log(f"Loading {drv}..."): |
53 | with subprocess.Popen( | |
54 | ["nix", "--experimental-features", "nix-command", | |
55 | "show-derivation", "/nix/store/" + drv], | |
56 | stdout=subprocess.PIPE) as process: | |
57 | assert process.stdout | |
58 | info = json.load(process.stdout) | |
59 | assert process.wait() == 0 | |
1e40791b SW |
60 | return info |
61 | ||
62 | ||
58265043 | 63 | def allBuilt(drv: str) -> bool: |
1e40791b SW |
64 | # TODO: How to pin outputs one at a time? |
65 | # Currently, we only pin when all outputs are available. | |
66 | # It would be better to pin the outputs we've got. | |
67 | return all(os.path.exists(o['path']) for d in getDrvInfo( | |
68 | drv).values() for o in d['outputs'].values()) | |
69 | ||
70 | ||
58265043 | 71 | def isDrv(thing: str) -> bool: |
1e40791b SW |
72 | return thing.endswith(".drv") |
73 | ||
74 | ||
58265043 | 75 | def removesuffix(s: str, suf: str) -> str: |
1e40791b SW |
76 | return s[:-len(suf)] if s.endswith(suf) else s |
77 | ||
78 | ||
58265043 | 79 | def pin(drv: str) -> None: |
1e40791b SW |
80 | outPath = os.path.join(sys.argv[2], removesuffix(drv, ".drv")) |
81 | if not os.path.exists(outPath): | |
07fdf185 SW |
82 | subprocess.run(["nix-store", "--realise", "--add-root", |
83 | outPath, "/nix/store/" + drv], check=True) | |
1e40791b SW |
84 | |
85 | ||
58265043 SW |
86 | def pinBuiltThings(thing: str, |
87 | done: Set[str], | |
88 | deps: Dict[str, List[str]]) -> None: | |
1e40791b SW |
89 | if thing in done: |
90 | return | |
91 | done.add(thing) | |
92 | if not isDrv(thing) or allBuilt(thing): | |
93 | pin(thing) | |
94 | return | |
95 | for dep in deps.get(thing, []): | |
96 | pinBuiltThings(dep, done, deps) | |
97 | ||
98 | ||
99 | dep_graph = getDeps(sys.argv[1]) | |
100 | for root in dep_graph.roots: | |
101 | pinBuiltThings(root, set(), dep_graph.deps) |