You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
11 KiB

# WIP GStreamer Recorder
import asyncio
import os
import errno
import shutil
import signal
import subprocess
import socket
import logging
import gi
import time
# gi.require_version('Gtk', '3.0') # for ui
gi.require_version('GIRepository', '2.0')
from gi.repository import GIRepository
# GIRepository.Repository.prepend_library_path(os.path.join(os.path.dirname(__file__), "out"))
gi.require_version('Gst', '1.0')
gi.require_version('GstVideo', '1.0')
import asyncio, gbulb
gbulb.install(gtk=False) # turn off gtk to remove ui
from gi.repository import Gst, GstVideo, GLib, GObject
# os.environ['HOME'] = "/home/deck"
# os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000"
dirpath = "/home/deck/Videos/ReplaySorcery"
logging.basicConfig(
#filename="/tmp/recorder.log",
format='%(asctime)s %(levelname)s %(message)s',
filemode='w',
force=True)
logger=logging.getLogger()
logger.setLevel(logging.DEBUG)
Gst.init("--verbose")
Gst.debug_set_active(True)
Gst.debug_set_default_threshold(4)
# if "replay-sorcery.conf" not in os.listdir("/home/deck/.config"):
class Plugin:
def __del__(self):
pass
def __init__(self):
self.loop = asyncio.get_event_loop()
async def _start_recorder(self):
pass
async def start_recorder(self):
if not self.recording:
self.recording = True
logger.info("Starting")
asyncio.ensure_future(self._start_recorder(self))
async def stop_recorder(self):
if self.recording:
self.recording = False
logger.info("Stopping")
os.kill(self.process.pid, signal.SIGINT)
async def toggle_recorder(self):
if self.recording:
return await self.stop_recorder(self)
else:
return await self.start_recorder(self)
async def save(self):
if self.recording and not self.paused:
self.saving = asyncio.get_event_loop().create_future()
logger.info("Saving")
with open('/tmp/recorder.log', "a") as out:
subprocess.Popen([os.path.dirname(__file__) + "/out/replay-sorcery", "save"], stdin=subprocess.PIPE, stdout=out, stderr=out)
logger.info("Done saving")
return await self.saving
return False
async def suspend_pause(self):
if self.recording:
logger.info("Suspending")
self.paused = True
os.kill(self.process.pid, signal.SIGINT)
async def suspend_resume(self):
if self.recording and self.paused:
logger.info("Resuming")
self.paused = False
asyncio.ensure_future(self._start_recorder(self))
# await self.start_recorder(self)
async def get_status(self):
return self.recording
# client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
# client.connect("/tmp/replay-sorcery/control.sock")
# client.close()
async def get_videos_len(self):
return len(os.listdir("/home/deck/Videos/ReplaySorcery"))
async def get_videos(self):
arr = sorted(filter(lambda s: os.path.isfile(os.path.join(dirpath, s)), os.listdir(dirpath)), key=lambda s: os.path.getmtime(os.path.join(dirpath, s)))
arr.reverse()
return arr
def _standalone(self):
self.loop.create_task(self._main())
self.loop.create_task(self._rec())
self.loop.run_forever()
async def _rec(self):
await asyncio.sleep(12)
print("start")
self.start_muxing()
# self.file.set_state(Gst.State.NULL)
# self.file.set_state(Gst.State.PLAYING)
# self.in_valve.set_property("drop", 1)
# self.out_valve.set_property("drop", 0)
await asyncio.sleep(10)
print("stopping")
self.stop_muxing()
# self.out_valve.set_property("drop", 1)
# self.in_valve.set_property("drop", 0)
# await asyncio.sleep(2)
# self.mux.send_event(Gst.Event.new_eos())
# await asyncio.sleep(4)
# self.file.set_state(Gst.State.NULL)
# self.file.set_property("location", "/dev/null")
# self.file.set_state(Gst.State.PLAYING)
# self.file.set_state(Gst.State.NULL)
# self.file.set_property("location", "/dev/null")
# self.file.set_state(Gst.State.PLAYING)
# self.valve.set_property("drop", 0)
# self.loc = f"./clips/{time.strftime('%Y-%m-%d-%H-%M-%S')}.mp4"
# self.drop = False
# self.play = True
# # self.pipeline.bus.add_signal_watch()
# # self.src.emit("end-of-stream")
# # self.valve.emit("end-of-stream")
# # self.proc.emit("end-of-stream")
# # self.mux.emit("end-of-stream")
# # self.parse.emit("end-of-stream")
# self.mp4mux.emit("end-of-stream")
# # self.sink.emit("end-of-stream")
# self.sink.set_state(Gst.State.NULL)
# self.mux.set_state(Gst.State.NULL)
# self.parse.set_state(Gst.State.NULL)
# self.mp4mux.set_state(Gst.State.NULL)
# self.sink.set_property("location", self.loc)
# # self.proc.send_event(Gst.Event.new_eos())
# # self.mux.send_event(Gst.Event.new_eos())
# # self.parse.send_event(Gst.Event.new_eos())
# # self.mp4mux.send_event(Gst.Event.new_eos())
# self.valve.set_property("drop", self.drop)
# self.sink.set_state(Gst.State.PLAYING)
# self.mux.set_state(Gst.State.PLAYING)
# self.parse.set_state(Gst.State.PLAYING)
# self.mp4mux.set_state(Gst.State.PLAYING)
# # self.sink.send_event(Gst.Event.new_eos())
# await asyncio.sleep(0.25)
# print("play")
# await asyncio.sleep(10)
# print("sw")
# self.loc = "/dev/null"
# self.drop = True
# self.play = False
# self.sink.set_state(Gst.State.NULL)
# self.sink.set_property("location", self.loc)
# self.valve.set_property("drop", self.drop)
# self.sink.set_state(Gst.State.PLAYING)
# # self.valve.send_event(Gst.Event.new_eos())
# # self.mux.send_event(Gst.Event.new_eos())
# # self.parse.send_event(Gst.Event.new_eos())
# # self.mp4mux.send_event(Gst.Event.new_eos())
# # self.sink.send_event(Gst.Event.new_eos())
# await asyncio.sleep(4)
# self.pipeline.bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
# self.pipeline.bus.add_signal_watch()
# self.pipeline.send_event(Gst.Event.new_eos())
# def on_message(self, bus, message: Gst.Message):
# mtype = message.type
# # if mtype == Gst.MessageType.EOS:
# # print("End of stream")
# # loop.quit()
# if mtype == Gst.MessageType.ERROR:
# err, debug = message.parse_error()
# self.pipeline.send_event(Gst.Event.new_eos())
# bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
# loop.quit()
# elif mtype == Gst.MessageType.ANY:
# err, debug = message.parse_warning()
# print(err, debug)
# return True
def on_msg(self, bus, message):
print("evt", message, message.type)
if message.type == Gst.MessageType.EOS:
print("start eos")
# self.mux.set_state(Gst.State.PAUSED)
# self.parse.set_state(Gst.State.PAUSED)
# self.mp4mux.set_state(Gst.State.PAUSED)
# self.sink.set_state(Gst.State.NULL)
# self.sink.set_property("location", self.loc)
# self.valve.set_property("drop", self.drop)
# if self.play:
# # self.mux.set_state(Gst.State.PLAYING)
# # self.parse.set_state(Gst.State.PLAYING)
# # self.mp4mux.set_state(Gst.State.PLAYING)
# self.sink.set_state(Gst.State.PLAYING)
print("finish eos")
# self.pipeline.bus.remove_signal_watch()
# self.pipeline.bus.add_signal_watch()
# return
if message.type == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(err, debug)
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(err, debug)
return True
def start_muxing(self):
enc = Gst.ElementFactory.make("vaapih264enc", "enc")
parse = Gst.ElementFactory.make("h264parse", "parse")
mp4mux = Gst.ElementFactory.make("mp4mux", "sink")
file = Gst.ElementFactory.make("filesink", "file")
file.set_property("sync", False)
file.set_property("location", f"./clips/{time.strftime('%Y-%m-%d-%H-%M-%S')}.mp4")
self.pipeline.add(enc)
self.pipeline.add(parse)
self.pipeline.add(mp4mux)
self.pipeline.add(file)
self.tee.link(enc)
enc.link(parse)
parse.link(mp4mux)
mp4mux.link(file)
enc.set_state(Gst.State.PLAYING)
parse.set_state(Gst.State.PLAYING)
mp4mux.set_state(Gst.State.PLAYING)
file.set_state(Gst.State.PLAYING)
self.enc = enc
self.parse = parse
self.mp4mux = mp4mux
self.file = file
# self.in_valve.set_property("drop", 1)
print("started muxer")
def stop_muxing(self):
# self.out_valve.set_property("drop", 1)
self.tee.unlink(self.enc)
# self.enc.send_event(Gst.Event.new_eos())
# self.parse.send_event(Gst.Event.new_eos())
self.mp4mux.send_event(Gst.Event.new_eos())
# self.file.send_event(Gst.Event.new_eos())
# self.out_valve.send_event(Gst.Event.new_eos())
# self.proc.unlink(self.enc)
# self.enc.unlink(self.parse)
# self.parse.unlink(self.mp4mux)
# self.mp4mux.unlink(self.file)
# self.in_valve.set_property("drop", 0)
async def _main(self):
pipeline = Gst.parse_launch("""
videotestsrc do-timestamp=true
! valve drop=0 name=in-flow
! vaapipostproc name=proc
! queue name=buf max-size-buffers=0 max-size-bytes=0 max-size-time=7000000000 min-threshold-time=5000000000 leaky=2
! tee name=t
t. ! fakesink sync=false
""")
# max-size-buffers=0 max-size-bytes=0 max-size-time=7000000000 min-threshold-time=5000000000 leaky=2
# pulsesrc
# ! audioconvert
# ! lamemp3enc target=bitrate bitrate=128 cbr=true
# ! sink.audio_0
# max-size-buffers=0 max-size-bytes=0 max-size-time=12000000000 min-threshold-time=10000000000 leaky=2
#! vaapipostproc
# !
# ! h264parse
# ! mp4mux name=sink
# ! filesink location=./video.mp4
pipeline.bus.add_signal_watch()
# pipeline.bus.enable_sync_message_emission()
pipeline.bus.connect("message", self.on_msg)
self.in_valve = pipeline.get_by_name("in-flow")
self.queue = pipeline.get_by_name("buf")
self.tee = pipeline.get_by_name("t")
self.pipeline = pipeline
pipeline.set_state(Gst.State.PLAYING)
# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
# async def _main(self):
# self.recording = True
# self.paused = True
# path = os.path.dirname(os.path.abspath(__file__))
# try:
# os.symlink("/home/deck/Videos/ReplaySorcery", path + "/dist/assets", target_is_directory=True)
# except OSError as e:
# if e.errno == errno.EEXIST:
# pass
# # await self.start_recorder(self)
# pass
if __name__ == "__main__":
Plugin()._standalone()