]>
Commit | Line | Data |
---|---|---|
1 | from contextlib import contextmanager | |
2 | import json | |
3 | import os | |
4 | import subprocess | |
5 | import sys | |
6 | import xml.sax | |
7 | import xml.sax.handler | |
8 | ||
9 | from typing import Any, Dict, Iterator, List, Set | |
10 | ||
11 | ||
12 | @contextmanager | |
13 | def log(msg: str) -> Iterator[None]: | |
14 | print(msg, file=sys.stderr, end='', flush=True) | |
15 | try: | |
16 | yield | |
17 | finally: | |
18 | print('\r', file=sys.stderr, end='', flush=True) | |
19 | ||
20 | ||
21 | class ParseNixStoreQueryGraphML(xml.sax.handler.ContentHandler): | |
22 | def __init__(self) -> None: | |
23 | super().__init__() | |
24 | self.roots: Set[str] = set() | |
25 | self.non_roots: Set[str] = set() | |
26 | self.deps: Dict[str, List[str]] = {} | |
27 | ||
28 | def startElement(self, name: str, attrs: Any) -> None: | |
29 | if name == "edge": | |
30 | source = attrs.getValue("source") | |
31 | target = attrs.getValue("target") | |
32 | self.non_roots.add(target) | |
33 | self.deps.setdefault(source, []).append(target) | |
34 | if target in self.roots: | |
35 | self.roots.remove(target) | |
36 | if source not in self.non_roots: | |
37 | self.roots.add(source) | |
38 | ||
39 | ||
40 | def getDeps(drv: str) -> ParseNixStoreQueryGraphML: | |
41 | with log("Loading dependency tree..."): | |
42 | with subprocess.Popen( | |
43 | ["nix-store", "--query", "--graphml", drv], stdout=subprocess.PIPE) as process: | |
44 | parser = ParseNixStoreQueryGraphML() | |
45 | assert process.stdout | |
46 | xml.sax.parse(process.stdout, parser) | |
47 | assert process.wait() == 0 | |
48 | return parser | |
49 | ||
50 | ||
51 | def getDrvInfo(drv: str) -> Any: | |
52 | with log(f"Loading {drv}..."): | |
53 | with subprocess.Popen( | |
54 | ["nix", "--experimental-features", "nix-command", | |
55 | "show-derivation", "/nix/store/" + drv], | |
56 | stdout=subprocess.PIPE) as process: | |
57 | assert process.stdout | |
58 | info = json.load(process.stdout) | |
59 | assert process.wait() == 0 | |
60 | return info | |
61 | ||
62 | ||
63 | def allBuilt(drv: str) -> bool: | |
64 | # TODO: How to pin outputs one at a time? | |
65 | # Currently, we only pin when all outputs are available. | |
66 | # It would be better to pin the outputs we've got. | |
67 | return all(os.path.exists(o['path']) for d in getDrvInfo( | |
68 | drv).values() for o in d['outputs'].values()) | |
69 | ||
70 | ||
71 | def isDrv(thing: str) -> bool: | |
72 | return thing.endswith(".drv") | |
73 | ||
74 | ||
75 | def removesuffix(s: str, suf: str) -> str: | |
76 | return s[:-len(suf)] if s.endswith(suf) else s | |
77 | ||
78 | ||
79 | def pin(drv: str) -> None: | |
80 | outPath = os.path.join(sys.argv[2], removesuffix(drv, ".drv")) | |
81 | if not os.path.exists(outPath): | |
82 | subprocess.run(["nix-store", "--realise", "--add-root", | |
83 | outPath, "/nix/store/" + drv], check=True) | |
84 | ||
85 | ||
86 | def pinBuiltThings(thing: str, | |
87 | done: Set[str], | |
88 | deps: Dict[str, List[str]]) -> None: | |
89 | if thing in done: | |
90 | return | |
91 | done.add(thing) | |
92 | if not isDrv(thing) or allBuilt(thing): | |
93 | pin(thing) | |
94 | return | |
95 | for dep in deps.get(thing, []): | |
96 | pinBuiltThings(dep, done, deps) | |
97 | ||
98 | ||
99 | dep_graph = getDeps(sys.argv[1]) | |
100 | for root in dep_graph.roots: | |
101 | pinBuiltThings(root, set(), dep_graph.deps) |