from subprocess import Popen, PIPE
import os
import time
class AdtfControl:
def __init__(self, url = "http://localhost:8000", executable = ""):
self.url = url
self.executable = executable
if not self.executable:
adtf_dir = os.environ["ADTF_DIR"]
if adtf_dir:
self.executable = adtf_dir + "/bin/adtf_control"
else:
self.executable = "adtf_control"
def exec_control(self, args):
call = [self.executable] + list(map(str, args))
if Popen(call).wait() != 0:
raise Exception("adtf_control exited with an error")
def exec_command(self, args):
call = [self.executable, "--url", self.url, "-e"] + list(map(str, args))
process = Popen(call, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = process.communicate()
exit_code = process.wait()
return [stdout, stderr, exit_code]
def check_if_adtf_is_alive(self):
try:
if self.isalive() == "yes":
return True
return False
except:
return False
def wait_for_runlevel(self, runlevel = 1):
while True:
try:
if self.runlevel().startswith(str(runlevel)):
return
time.sleep(1)
except:
pass
def wait_for_startup_complete(self, process = None):
while True:
if process != None and process.poll() != None:
raise Exception("Launcher process has exited.")
try:
if self.startupcompleted().startswith("yes"):
return
except:
pass
time.sleep(1)
def __getattr__(self, name):
def forward_to_control(*args, **kwargs):
(stdout, stderr, exit_code) = self.exec_command([name] + list(map(str, args)))
if exit_code != 0:
raise Exception("adtf_control returned an error: " + str(stderr))
return str(stdout.decode("utf-8"))
return forward_to_control
class AdtfLauncher:
def __init__(self, args, executable = ""):
if not executable:
adtf_dir = os.environ["ADTF_DIR"]
if adtf_dir:
executable = adtf_dir + "/bin/adtf_launcher"
else:
executable = "adtf_launcher"
call = [executable] + list(map(str, args))
self.process = Popen(call)
url = "http://localhost:8000"
adtf_dir = os.environ["ADTF_DIR"]
ac = AdtfControl(url)
if ac.check_if_adtf_is_alive():
print("shutting down currently active instance")
ac.shutdown()
time.sleep(3)
launcher = AdtfLauncher(["--session", adtf_dir + "/src/examples/projects/adtf_example_project/adtfsessions/demo_playback_session/demo_playback.adtfsession", "--control-url", url])
ac.wait_for_startup_complete(launcher.process)
ac.runlevel("filtergraph")
ac.setprop("[playback.service.adtf]", "playback_speed", 0)
ac.setprop("default/playback", "start_on_startup", "false")
event_buffer = ac.createeventbuffer("[playback.service.adtf]", "player")
ac.runlevel("running")
ac.play()
ac.waitevent(event_buffer, 1)
playback_files = ac.playbackfiles()
ac.open(playback_files)
ac.play()
ac.waitevent(event_buffer, 1)
ac.shutdown()
launcher.process.wait()