]>
Commit | Line | Data |
---|---|---|
1e40791b SW |
1 | from contextlib import contextmanager |
2 | import json | |
3 | import os | |
4 | import subprocess | |
5 | import sys | |
6 | import xml.sax | |
7 | import xml.sax.handler | |
8 | ||
58265043 SW |
9 | from typing import Any, Dict, Iterator, List, Set |
10 | ||
1e40791b SW |
11 | |
12 | @contextmanager | |
58265043 | 13 | def log(msg: str) -> Iterator[None]: |
1e40791b SW |
14 | print(msg, file=sys.stderr, end='', flush=True) |
15 | try: | |
16 | yield | |
17 | finally: | |
18 | print('\r', file=sys.stderr, end='', flush=True) | |
19 | ||
20 | ||
21 | class ParseNixStoreQueryGraphML(xml.sax.handler.ContentHandler): | |
58265043 SW |
22 | def __init__(self) -> None: |
23 | self.roots: Set[str] = set() | |
24 | self.non_roots: Set[str] = set() | |
25 | self.deps: Dict[str, List[str]] = {} | |
1e40791b | 26 | |
58265043 | 27 | def startElement(self, name: str, attrs: Any) -> None: |
1e40791b SW |
28 | if name == "edge": |
29 | source = attrs.getValue("source") | |
30 | target = attrs.getValue("target") | |
31 | self.non_roots.add(target) | |
32 | self.deps.setdefault(source, []).append(target) | |
33 | if target in self.roots: | |
34 | self.roots.remove(target) | |
35 | if source not in self.non_roots: | |
36 | self.roots.add(source) | |
37 | ||
38 | ||
58265043 | 39 | def getDeps(drv: str) -> ParseNixStoreQueryGraphML: |
1e40791b SW |
40 | with log("Loading dependency tree..."): |
41 | process = subprocess.Popen( | |
42 | ["nix-store", "--query", "--graphml", drv], stdout=subprocess.PIPE) | |
43 | parser = ParseNixStoreQueryGraphML() | |
58265043 | 44 | assert process.stdout |
1e40791b SW |
45 | xml.sax.parse(process.stdout, parser) |
46 | assert process.wait() == 0 | |
47 | return parser | |
48 | ||
49 | ||
58265043 | 50 | def getDrvInfo(drv: str) -> Any: |
1e40791b SW |
51 | with log("Loading %s..." % drv): |
52 | process = subprocess.Popen(["nix", | |
53 | "--experimental-features", | |
54 | "nix-command", | |
55 | "show-derivation", | |
56 | "/nix/store/" + drv], | |
57 | stdout=subprocess.PIPE) | |
58265043 | 58 | assert process.stdout |
1e40791b SW |
59 | info = json.load(process.stdout) |
60 | assert process.wait() == 0 | |
61 | return info | |
62 | ||
63 | ||
58265043 | 64 | def allBuilt(drv: str) -> bool: |
1e40791b SW |
65 | # TODO: How to pin outputs one at a time? |
66 | # Currently, we only pin when all outputs are available. | |
67 | # It would be better to pin the outputs we've got. | |
68 | return all(os.path.exists(o['path']) for d in getDrvInfo( | |
69 | drv).values() for o in d['outputs'].values()) | |
70 | ||
71 | ||
58265043 | 72 | def isDrv(thing: str) -> bool: |
1e40791b SW |
73 | return thing.endswith(".drv") |
74 | ||
75 | ||
58265043 | 76 | def removesuffix(s: str, suf: str) -> str: |
1e40791b SW |
77 | return s[:-len(suf)] if s.endswith(suf) else s |
78 | ||
79 | ||
58265043 | 80 | def pin(drv: str) -> None: |
1e40791b SW |
81 | outPath = os.path.join(sys.argv[2], removesuffix(drv, ".drv")) |
82 | if not os.path.exists(outPath): | |
83 | process = subprocess.run(["nix-store", | |
84 | "--realise", | |
85 | "--add-root", | |
86 | outPath, | |
87 | "/nix/store/" + drv], | |
88 | check=True) | |
89 | ||
90 | ||
58265043 SW |
91 | def pinBuiltThings(thing: str, |
92 | done: Set[str], | |
93 | deps: Dict[str, List[str]]) -> None: | |
1e40791b SW |
94 | if thing in done: |
95 | return | |
96 | done.add(thing) | |
97 | if not isDrv(thing) or allBuilt(thing): | |
98 | pin(thing) | |
99 | return | |
100 | for dep in deps.get(thing, []): | |
101 | pinBuiltThings(dep, done, deps) | |
102 | ||
103 | ||
104 | dep_graph = getDeps(sys.argv[1]) | |
105 | for root in dep_graph.roots: | |
106 | pinBuiltThings(root, set(), dep_graph.deps) |