# under the terms of the GNU General Public License as published by the
# Free Software Foundation, version 3.
+from __future__ import annotations
+
from dataclasses import dataclass
from datetime import datetime
import os
import subprocess
-from typing import Callable
+import sys
+import time
+from typing import Any, Callable
import gi
gi.require_version("Gtk", "4.0")
-from gi.repository import Gtk # nopep8 pylint: disable=wrong-import-position
+gi.require_version("GLib", "2.0")
+
+from gi.repository import Gtk # nopep8 pylint: disable=wrong-import-position
+from gi.repository import GLib # nopep8 pylint: disable=wrong-import-position
@dataclass
-class Recording:
- filename: str
+class Stream:
process: subprocess.Popen[bytes]
+ @staticmethod
+ def start(command: list[str]) -> Stream:
+ # pylint: disable=consider-using-with
+ return Stream(process=subprocess.Popen(command, stdin=subprocess.PIPE))
+
+ def stop(self) -> None:
+ stdin = self.process.stdin
+ assert stdin is not None
+ try:
+ stdin.write(b'q')
+ stdin.flush()
+ except BrokenPipeError:
+ print("Stream already stopped?", file=sys.stderr)
+ self.process.wait()
-recording: Recording | None = None
+
+recording: Stream | None = None
+sharing: Stream | None = None
def make_filename() -> str:
def video_source(stack: Gtk.Stack) -> list[str]:
if stack.get_child_by_name('not_recording').get_first_child().get_active():
- return ['-f', 'v4l2', '-i', '/dev/video0']
- return ['-f', 'x11grab', '-i', ':0.0+0,0']
+ return ['-f', 'x11grab', '-i', ':0.0+0,0']
+ if is_sharing_enabled(stack):
+ return ['-f', 'v4l2', '-i', '/dev/video9']
+ return ['-f', 'v4l2', '-i', '/dev/video0']
+
+
+def is_sharing_enabled(stack: Gtk.Stack) -> bool:
+ enabled = stack.get_child_by_name('not_recording').get_first_child(
+ ).get_next_sibling().get_next_sibling().get_active()
+ assert isinstance(enabled, bool)
+ return enabled
+
+
+def find_size_display(stack: Gtk.Stack) -> Gtk.Label:
+ return stack.get_child_by_name(
+ 'recording').get_first_child().get_next_sibling()
+
+
+def summarize_size(n: int) -> str:
+ if n > 100_000_000:
+ m = int(n / (1024 * 1024))
+ return f'{m}M'
+ if n > 100_000:
+ k = int(n / 1024)
+ return f'{k}K'
+ return str(n)
+
+
+def begin_monitoring_file_size(size_display: Gtk.Label, filename: str) -> None:
+ def update_size_display() -> Any:
+ done = recording is None
+ if done:
+ size_display.set_label('')
+ else:
+ try:
+ size = summarize_size(os.stat(filename).st_size)
+ except FileNotFoundError:
+ size = '--'
+ size_display.set_label(f'<big>{size}</big>')
+ return GLib.SOURCE_REMOVE if done else GLib.SOURCE_CONTINUE
+ GLib.timeout_add_seconds(1, update_size_display)
def on_start_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
- global recording # pylint: disable=global-statement
+ global recording, sharing # pylint: disable=global-statement
assert recording is None
+ assert sharing is None
+
filename = make_filename()
- command = (['ffmpeg', '-framerate', '25'] + video_source(stack) +
- ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
- # pylint: disable=consider-using-with
- recording = Recording(
- filename=filename,
- process=subprocess.Popen(command, stdin=subprocess.PIPE))
+ begin_monitoring_file_size(find_size_display(stack), filename)
+
stack.set_visible_child_name("recording")
+ if is_sharing_enabled(stack):
+ out_opts = [
+ '-f', 'v4l2', '-framerate', '25', '-codec:v', 'rawvideo',
+ '-pix_fmt', 'yuv420p',
+ ]
+ sharing = Stream.start(
+ ['ffmpeg', '-i', '/dev/video0']
+ + out_opts + ['/dev/video8']
+ + out_opts + ['/dev/video9'])
+ time.sleep(3) # Bad!! We should not be sleeping in this thread!
+
+ recording = Stream.start(
+ ['ffmpeg', '-framerate', '25'] + video_source(stack) +
+ ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
+
def on_stop_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
- global recording # pylint: disable=global-statement
+ global recording, sharing # pylint: disable=global-statement
assert recording is not None
- stdin = recording.process.stdin
- assert stdin is not None
- stdin.write(b'q')
- stdin.flush()
- recording.process.wait()
+ recording.stop()
recording = None
+ if sharing is not None:
+ sharing.stop()
+ sharing = None
stack.set_visible_child_name("not_recording")
return button
+def make_share_control() -> Gtk.CheckButton:
+ can_share = os.path.exists('/sys/module/v4l2loopback')
+ control = Gtk.CheckButton(
+ label='Share Webcam', sensitive=can_share, active=can_share)
+ control.set_margin_start(20)
+ return control
+
+
def on_activate(app: Gtk.Application) -> None:
win = Gtk.ApplicationWindow(application=app)
win.set_title('SRec')
+ win.set_icon_name('srec')
+
stack = Gtk.Stack()
nr_box = Gtk.Box()
nr_box.set_orientation(Gtk.Orientation.VERTICAL)
- webcam = Gtk.CheckButton(label='Webcam', active=True)
- nr_box.append(webcam)
- nr_box.append(Gtk.CheckButton(label='Screen', group=webcam))
+ screen = Gtk.CheckButton(label='Screen')
+ nr_box.append(screen)
+ nr_box.append(Gtk.CheckButton(label='Webcam', active=True, group=screen))
+ nr_box.append(make_share_control())
nr_box.append(make_button("Start Recording", on_start_recording, stack))
stack.add_named(nr_box, "not_recording")
r_box = Gtk.Box()
r_box.set_orientation(Gtk.Orientation.VERTICAL)
r_box.append(make_button("Stop Recording", on_stop_recording, stack))
+ r_box.append(Gtk.Label(use_markup=True, justify=Gtk.Justification.CENTER))
stack.add_named(r_box, "recording")
win.set_child(stack)