]> git.scottworley.com Git - srec/blob - srec.py
Tee webcam video to two virtual cameras
[srec] / srec.py
1 # srec: A simple GUI for screen recording
2 #
3 # This program is free software: you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation, version 3.
6
7 from __future__ import annotations
8
9 from dataclasses import dataclass
10 from datetime import datetime
11 import os
12 import subprocess
13 import sys
14 import time
15 from typing import Any, Callable
16
17 import gi
18 gi.require_version("Gtk", "4.0")
19 gi.require_version("GLib", "2.0")
20
21 from gi.repository import Gtk # nopep8 pylint: disable=wrong-import-position
22 from gi.repository import GLib # nopep8 pylint: disable=wrong-import-position
23
24
25 @dataclass
26 class Stream:
27 process: subprocess.Popen[bytes]
28
29 @staticmethod
30 def start(command: list[str]) -> Stream:
31 # pylint: disable=consider-using-with
32 return Stream(process=subprocess.Popen(command, stdin=subprocess.PIPE))
33
34 def stop(self) -> None:
35 stdin = self.process.stdin
36 assert stdin is not None
37 try:
38 stdin.write(b'q')
39 stdin.flush()
40 except BrokenPipeError:
41 print("Stream already stopped?", file=sys.stderr)
42 self.process.wait()
43
44
45 recording: Stream | None = None
46 sharing: Stream | None = None
47
48
49 def make_filename() -> str:
50 directory = os.environ.get(
51 'XDG_VIDEOS_DIR',
52 os.path.expanduser('~/Videos/SRec'))
53 os.makedirs(directory, exist_ok=True)
54 timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
55 return os.path.join(directory, f'srec {timestamp}.mkv')
56
57
58 def video_source(stack: Gtk.Stack) -> list[str]:
59 if stack.get_child_by_name('not_recording').get_first_child().get_active():
60 return ['-f', 'x11grab', '-i', ':0.0+0,0']
61 if is_sharing_enabled(stack):
62 return ['-f', 'v4l2', '-i', '/dev/video9']
63 return ['-f', 'v4l2', '-i', '/dev/video0']
64
65
66 def is_sharing_enabled(stack: Gtk.Stack) -> bool:
67 enabled = stack.get_child_by_name('not_recording').get_first_child(
68 ).get_next_sibling().get_next_sibling().get_active()
69 assert isinstance(enabled, bool)
70 return enabled
71
72
73 def find_size_display(stack: Gtk.Stack) -> Gtk.Label:
74 return stack.get_child_by_name(
75 'recording').get_first_child().get_next_sibling()
76
77
78 def summarize_size(n: int) -> str:
79 if n > 100_000_000:
80 m = int(n / (1024 * 1024))
81 return f'{m}M'
82 if n > 100_000:
83 k = int(n / 1024)
84 return f'{k}K'
85 return str(n)
86
87
88 def begin_monitoring_file_size(size_display: Gtk.Label, filename: str) -> None:
89 def update_size_display() -> Any:
90 done = recording is None
91 if done:
92 size_display.set_label('')
93 else:
94 try:
95 size = summarize_size(os.stat(filename).st_size)
96 except FileNotFoundError:
97 size = '--'
98 size_display.set_label(f'<big>{size}</big>')
99 return GLib.SOURCE_REMOVE if done else GLib.SOURCE_CONTINUE
100 GLib.timeout_add_seconds(1, update_size_display)
101
102
103 def on_start_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
104 global recording, sharing # pylint: disable=global-statement
105 assert recording is None
106 assert sharing is None
107
108 filename = make_filename()
109 begin_monitoring_file_size(find_size_display(stack), filename)
110
111 stack.set_visible_child_name("recording")
112
113 if is_sharing_enabled(stack):
114 out_opts = [
115 '-f', 'v4l2', '-framerate', '25', '-codec:v', 'rawvideo',
116 '-pix_fmt', 'yuv420p',
117 ]
118 sharing = Stream.start(
119 ['ffmpeg', '-i', '/dev/video0']
120 + out_opts + ['/dev/video8']
121 + out_opts + ['/dev/video9'])
122 time.sleep(3) # Bad!! We should not be sleeping in this thread!
123
124 recording = Stream.start(
125 ['ffmpeg', '-framerate', '25'] + video_source(stack) +
126 ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
127
128
129 def on_stop_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
130 global recording, sharing # pylint: disable=global-statement
131 assert recording is not None
132 recording.stop()
133 recording = None
134 if sharing is not None:
135 sharing.stop()
136 sharing = None
137 stack.set_visible_child_name("not_recording")
138
139
140 def make_button(label: str, action: Callable[[
141 Gtk.Button, Gtk.Stack], None], stack: Gtk.Stack) -> Gtk.Button:
142 button = Gtk.Button(label=label)
143 button.connect('clicked', action, stack)
144 button.set_margin_top(10)
145 button.set_margin_start(10)
146 button.set_margin_end(10)
147 button.set_margin_bottom(10)
148 return button
149
150
151 def make_share_control() -> Gtk.CheckButton:
152 can_share = os.path.exists('/sys/module/v4l2loopback')
153 control = Gtk.CheckButton(
154 label='Share Webcam', sensitive=can_share, active=can_share)
155 control.set_margin_start(20)
156 return control
157
158
159 def on_activate(app: Gtk.Application) -> None:
160 win = Gtk.ApplicationWindow(application=app)
161 win.set_title('SRec')
162 win.set_icon_name('srec')
163
164 stack = Gtk.Stack()
165
166 nr_box = Gtk.Box()
167 nr_box.set_orientation(Gtk.Orientation.VERTICAL)
168 screen = Gtk.CheckButton(label='Screen')
169 nr_box.append(screen)
170 nr_box.append(Gtk.CheckButton(label='Webcam', active=True, group=screen))
171 nr_box.append(make_share_control())
172 nr_box.append(make_button("Start Recording", on_start_recording, stack))
173 stack.add_named(nr_box, "not_recording")
174
175 r_box = Gtk.Box()
176 r_box.set_orientation(Gtk.Orientation.VERTICAL)
177 r_box.append(make_button("Stop Recording", on_stop_recording, stack))
178 r_box.append(Gtk.Label(use_markup=True, justify=Gtk.Justification.CENTER))
179 stack.add_named(r_box, "recording")
180
181 win.set_child(stack)
182 win.present()
183
184
185 def main() -> None:
186 app = Gtk.Application(application_id='net.chkno.srec')
187 app.connect('activate', on_activate)
188 app.run(None)
189
190
191 if __name__ == '__main__':
192 main()