]> git.scottworley.com Git - srec/blob - srec.py
Two virtual cameras
[srec] / srec.py
1 # srec: A simple GUI for screen recording
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7 from __future__ import annotations
8
9 from dataclasses import dataclass
10 from datetime import datetime
11 import os
12 import subprocess
13 import sys
14 import time
15 from typing import Any, Callable
16
17 import gi
18 gi.require_version("Gtk", "4.0")
19 gi.require_version("GLib", "2.0")
20
21 from gi.repository import Gtk # nopep8 pylint: disable=wrong-import-position
22 from gi.repository import GLib # nopep8 pylint: disable=wrong-import-position
23
24
25 @dataclass
26 class Stream:
27 process: subprocess.Popen[bytes]
28
29 @staticmethod
30 def start(command: list[str]) -> Stream:
31 # pylint: disable=consider-using-with
32 return Stream(process=subprocess.Popen(command, stdin=subprocess.PIPE))
33
34 def stop(self) -> None:
35 stdin = self.process.stdin
36 assert stdin is not None
37 try:
38 stdin.write(b'q')
39 stdin.flush()
40 except BrokenPipeError:
41 print("Stream already stopped?", file=sys.stderr)
42 self.process.wait()
43
44
45 recording: Stream | None = None
46 sharing: Stream | None = None
47
48
49 def make_filename() -> str:
50 directory = os.environ.get(
51 'XDG_VIDEOS_DIR',
52 os.path.expanduser('~/Videos/SRec'))
53 os.makedirs(directory, exist_ok=True)
54 timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
55 return os.path.join(directory, f'srec {timestamp}.mkv')
56
57
58 def video_source(stack: Gtk.Stack) -> list[str]:
59 if stack.get_child_by_name('not_recording').get_first_child().get_active():
60 return ['-f', 'x11grab', '-i', ':0.0+0,0']
61 if is_sharing_enabled(stack):
62 return ['-f', 'v4l2', '-i', '/dev/video9']
63 return ['-f', 'v4l2', '-i', '/dev/video0']
64
65
66 def is_sharing_enabled(stack: Gtk.Stack) -> bool:
67 enabled = stack.get_child_by_name('not_recording').get_first_child(
68 ).get_next_sibling().get_next_sibling().get_active()
69 assert isinstance(enabled, bool)
70 return enabled
71
72
73 def find_size_display(stack: Gtk.Stack) -> Gtk.Label:
74 return stack.get_child_by_name(
75 'recording').get_first_child().get_next_sibling()
76
77
78 def summarize_size(n: int) -> str:
79 if n > 100_000_000:
80 m = int(n / (1024 * 1024))
81 return f'{m}M'
82 if n > 100_000:
83 k = int(n / 1024)
84 return f'{k}K'
85 return str(n)
86
87
88 def begin_monitoring_file_size(size_display: Gtk.Label, filename: str) -> None:
89 def update_size_display() -> Any:
90 done = recording is None
91 if done:
92 size_display.set_label('')
93 else:
94 try:
95 size = summarize_size(os.stat(filename).st_size)
96 except FileNotFoundError:
97 size = '--'
98 size_display.set_label(f'<big>{size}</big>')
99 return GLib.SOURCE_REMOVE if done else GLib.SOURCE_CONTINUE
100 GLib.timeout_add_seconds(1, update_size_display)
101
102
103 def on_start_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
104 global recording, sharing # pylint: disable=global-statement
105 assert recording is None
106 assert sharing is None
107
108 filename = make_filename()
109 begin_monitoring_file_size(find_size_display(stack), filename)
110
111 stack.set_visible_child_name("recording")
112
113 if is_sharing_enabled(stack):
114 sharing = Stream.start(
115 ['ffmpeg', '-i', '/dev/video0', '-f', 'v4l2', '-framerate', '25',
116 '-codec:v', 'rawvideo', '-pix_fmt', 'yuv420p', '/dev/video9'])
117 time.sleep(3) # Bad!! We should not be sleeping in this thread!
118
119 recording = Stream.start(
120 ['ffmpeg', '-framerate', '25'] + video_source(stack) +
121 ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
122
123
124 def on_stop_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
125 global recording, sharing # pylint: disable=global-statement
126 assert recording is not None
127 recording.stop()
128 recording = None
129 if sharing is not None:
130 sharing.stop()
131 sharing = None
132 stack.set_visible_child_name("not_recording")
133
134
135 def make_button(label: str, action: Callable[[
136 Gtk.Button, Gtk.Stack], None], stack: Gtk.Stack) -> Gtk.Button:
137 button = Gtk.Button(label=label)
138 button.connect('clicked', action, stack)
139 button.set_margin_top(10)
140 button.set_margin_start(10)
141 button.set_margin_end(10)
142 button.set_margin_bottom(10)
143 return button
144
145
146 def make_share_control() -> Gtk.CheckButton:
147 can_share = os.path.exists('/sys/module/v4l2loopback')
148 control = Gtk.CheckButton(
149 label='Share Webcam', sensitive=can_share, active=can_share)
150 control.set_margin_start(20)
151 return control
152
153
154 def on_activate(app: Gtk.Application) -> None:
155 win = Gtk.ApplicationWindow(application=app)
156 win.set_title('SRec')
157 win.set_icon_name('srec')
158
159 stack = Gtk.Stack()
160
161 nr_box = Gtk.Box()
162 nr_box.set_orientation(Gtk.Orientation.VERTICAL)
163 screen = Gtk.CheckButton(label='Screen')
164 nr_box.append(screen)
165 nr_box.append(Gtk.CheckButton(label='Webcam', active=True, group=screen))
166 nr_box.append(make_share_control())
167 nr_box.append(make_button("Start Recording", on_start_recording, stack))
168 stack.add_named(nr_box, "not_recording")
169
170 r_box = Gtk.Box()
171 r_box.set_orientation(Gtk.Orientation.VERTICAL)
172 r_box.append(make_button("Stop Recording", on_stop_recording, stack))
173 r_box.append(Gtk.Label(use_markup=True, justify=Gtk.Justification.CENTER))
174 stack.add_named(r_box, "recording")
175
176 win.set_child(stack)
177 win.present()
178
179
180 def main() -> None:
181 app = Gtk.Application(application_id='net.chkno.srec')
182 app.connect('activate', on_activate)
183 app.run(None)
184
185
186 if __name__ == '__main__':
187 main()