]>
Commit | Line | Data |
---|---|---|
1 | from contextlib import contextmanager | |
2 | import json | |
3 | import os | |
4 | import subprocess | |
5 | import sys | |
6 | import xml.sax | |
7 | import xml.sax.handler | |
8 | ||
9 | ||
10 | @contextmanager | |
11 | def log(msg): | |
12 | print(msg, file=sys.stderr, end='', flush=True) | |
13 | try: | |
14 | yield | |
15 | finally: | |
16 | print('\r', file=sys.stderr, end='', flush=True) | |
17 | ||
18 | ||
19 | class ParseNixStoreQueryGraphML(xml.sax.handler.ContentHandler): | |
20 | def __init__(self): | |
21 | self.roots = set() | |
22 | self.non_roots = set() | |
23 | self.deps = {} | |
24 | ||
25 | def startElement(self, name, attrs): | |
26 | if name == "edge": | |
27 | source = attrs.getValue("source") | |
28 | target = attrs.getValue("target") | |
29 | self.non_roots.add(target) | |
30 | self.deps.setdefault(source, []).append(target) | |
31 | if target in self.roots: | |
32 | self.roots.remove(target) | |
33 | if source not in self.non_roots: | |
34 | self.roots.add(source) | |
35 | ||
36 | ||
37 | def getDeps(drv): | |
38 | with log("Loading dependency tree..."): | |
39 | process = subprocess.Popen( | |
40 | ["nix-store", "--query", "--graphml", drv], stdout=subprocess.PIPE) | |
41 | parser = ParseNixStoreQueryGraphML() | |
42 | xml.sax.parse(process.stdout, parser) | |
43 | assert process.wait() == 0 | |
44 | return parser | |
45 | ||
46 | ||
47 | def getDrvInfo(drv): | |
48 | with log("Loading %s..." % drv): | |
49 | process = subprocess.Popen(["nix", | |
50 | "--experimental-features", | |
51 | "nix-command", | |
52 | "show-derivation", | |
53 | "/nix/store/" + drv], | |
54 | stdout=subprocess.PIPE) | |
55 | info = json.load(process.stdout) | |
56 | assert process.wait() == 0 | |
57 | return info | |
58 | ||
59 | ||
60 | def allBuilt(drv): | |
61 | # TODO: How to pin outputs one at a time? | |
62 | # Currently, we only pin when all outputs are available. | |
63 | # It would be better to pin the outputs we've got. | |
64 | return all(os.path.exists(o['path']) for d in getDrvInfo( | |
65 | drv).values() for o in d['outputs'].values()) | |
66 | ||
67 | ||
68 | def isDrv(thing): | |
69 | return thing.endswith(".drv") | |
70 | ||
71 | ||
72 | def removesuffix(s, suf): | |
73 | return s[:-len(suf)] if s.endswith(suf) else s | |
74 | ||
75 | ||
76 | def pin(drv): | |
77 | outPath = os.path.join(sys.argv[2], removesuffix(drv, ".drv")) | |
78 | if not os.path.exists(outPath): | |
79 | process = subprocess.run(["nix-store", | |
80 | "--realise", | |
81 | "--add-root", | |
82 | outPath, | |
83 | "/nix/store/" + drv], | |
84 | check=True) | |
85 | ||
86 | ||
87 | def pinBuiltThings(thing, done, deps): | |
88 | if thing in done: | |
89 | return | |
90 | done.add(thing) | |
91 | if not isDrv(thing) or allBuilt(thing): | |
92 | pin(thing) | |
93 | return | |
94 | for dep in deps.get(thing, []): | |
95 | pinBuiltThings(dep, done, deps) | |
96 | ||
97 | ||
98 | dep_graph = getDeps(sys.argv[1]) | |
99 | for root in dep_graph.roots: | |
100 | pinBuiltThings(root, set(), dep_graph.deps) |