]>
Commit | Line | Data |
---|---|---|
1 | # nix-pin-deps: gc-pin dependencies of a partially-built derivation | |
2 | # | |
3 | # This program is free software: you can redistribute it and/or modify it | |
4 | # under the terms of the GNU General Public License as published by the | |
5 | # Free Software Foundation, version 3. | |
6 | ||
7 | ||
8 | from contextlib import contextmanager | |
9 | import json | |
10 | import os | |
11 | import subprocess | |
12 | import sys | |
13 | import xml.sax | |
14 | import xml.sax.handler | |
15 | ||
16 | from typing import Any, Dict, Iterator, List, Set | |
17 | ||
18 | ||
19 | @contextmanager | |
20 | def log(msg: str) -> Iterator[None]: | |
21 | print(msg, file=sys.stderr, end='', flush=True) | |
22 | try: | |
23 | yield | |
24 | finally: | |
25 | print('\r', file=sys.stderr, end='', flush=True) | |
26 | ||
27 | ||
28 | class ParseNixStoreQueryGraphML(xml.sax.handler.ContentHandler): | |
29 | def __init__(self) -> None: | |
30 | super().__init__() | |
31 | self.roots: Set[str] = set() | |
32 | self.non_roots: Set[str] = set() | |
33 | self.deps: Dict[str, List[str]] = {} | |
34 | ||
35 | def startElement(self, name: str, attrs: Any) -> None: | |
36 | if name == "edge": | |
37 | source = attrs.getValue("source") | |
38 | target = attrs.getValue("target") | |
39 | self.non_roots.add(target) | |
40 | self.deps.setdefault(source, []).append(target) | |
41 | if target in self.roots: | |
42 | self.roots.remove(target) | |
43 | if source not in self.non_roots: | |
44 | self.roots.add(source) | |
45 | ||
46 | ||
47 | def getDeps(drv: str) -> ParseNixStoreQueryGraphML: | |
48 | with log("Loading dependency tree..."): | |
49 | with subprocess.Popen( | |
50 | ["nix-store", "--query", "--graphml", drv], stdout=subprocess.PIPE) as process: | |
51 | parser = ParseNixStoreQueryGraphML() | |
52 | assert process.stdout | |
53 | xml.sax.parse(process.stdout, parser) | |
54 | assert process.wait() == 0 | |
55 | return parser | |
56 | ||
57 | ||
58 | def getDrvInfo(drv: str) -> Any: | |
59 | with log(f"Loading {drv}..."): | |
60 | with subprocess.Popen( | |
61 | ["nix", "--experimental-features", "nix-command", | |
62 | "derivation", "show", f"/nix/store/{drv}^*"], | |
63 | stdout=subprocess.PIPE) as process: | |
64 | assert process.stdout | |
65 | info = json.load(process.stdout) | |
66 | assert process.wait() == 0 | |
67 | return info | |
68 | ||
69 | ||
70 | def allBuilt(drv: str) -> bool: | |
71 | # TODO: How to pin outputs one at a time? | |
72 | # Currently, we only pin when all outputs are available. | |
73 | # It would be better to pin the outputs we've got. | |
74 | return all(os.path.exists(o['path']) for d in getDrvInfo( | |
75 | drv).values() for o in d['outputs'].values()) | |
76 | ||
77 | ||
78 | def isDrv(thing: str) -> bool: | |
79 | return thing.endswith(".drv") | |
80 | ||
81 | ||
82 | def removesuffix(s: str, suf: str) -> str: | |
83 | return s[:-len(suf)] if s.endswith(suf) else s | |
84 | ||
85 | ||
86 | def pin(drv: str) -> None: | |
87 | outPath = os.path.join(sys.argv[2], removesuffix(drv, ".drv")) | |
88 | if not os.path.exists(outPath): | |
89 | subprocess.run(["nix-store", "--realise", "--add-root", | |
90 | outPath, "/nix/store/" + drv], check=True) | |
91 | ||
92 | ||
93 | def pinBuiltThings(thing: str, | |
94 | done: Set[str], | |
95 | deps: Dict[str, List[str]]) -> None: | |
96 | if thing in done: | |
97 | return | |
98 | done.add(thing) | |
99 | if not isDrv(thing) or allBuilt(thing): | |
100 | pin(thing) | |
101 | return | |
102 | for dep in deps.get(thing, []): | |
103 | pinBuiltThings(dep, done, deps) | |
104 | ||
105 | ||
106 | def main() -> None: | |
107 | dep_graph = getDeps(sys.argv[1]) | |
108 | for root in dep_graph.roots: | |
109 | pinBuiltThings(root, set(), dep_graph.deps) | |
110 | ||
111 | ||
112 | if __name__ == '__main__': | |
113 | main() |