from datetime import datetime
import os
import subprocess
+import sys
from typing import Any, Callable
import gi
@dataclass
-class Recording:
- filename: str
+class Stream:
process: subprocess.Popen[bytes]
+ def stop(self) -> None:
+ stdin = self.process.stdin
+ assert stdin is not None
+ try:
+ stdin.write(b'q')
+ stdin.flush()
+ except BrokenPipeError:
+ print("Stream already stopped?", file=sys.stderr)
+ self.process.wait()
+
-recording: Recording | None = None
+recording: Stream | None = None
def make_filename() -> str:
command = (['ffmpeg', '-framerate', '25'] + video_source(stack) +
['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
# pylint: disable=consider-using-with
- recording = Recording(
- filename=filename,
+ recording = Stream(
process=subprocess.Popen(command, stdin=subprocess.PIPE))
stack.set_visible_child_name("recording")
def on_stop_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
global recording # pylint: disable=global-statement
assert recording is not None
- stdin = recording.process.stdin
- assert stdin is not None
- stdin.write(b'q')
- stdin.flush()
- recording.process.wait()
+ recording.stop()
recording = None
stack.set_visible_child_name("not_recording")
def make_share_control() -> Gtk.CheckButton:
- can_share = os.path.exists('/sys/module/v4l2looback')
+ can_share = os.path.exists('/sys/module/v4l2loopback')
control = Gtk.CheckButton(
label='Share Webcam', sensitive=can_share, active=can_share)
control.set_margin_start(20)