ADTF  3.18.2
Demo Script Config Tool Create Graph
Description
This scripting example demonstrates how to create a graph using ADTF Config Tool and python.
Location
./src/examples/scripts/adtf_config_tool/
This example shows:
  • how to add filter and activerunner
  • how to connect components
Remarks
  • To execute the python script a Python installation is required !
Script
#!/usr/bin/python
# This example demonstrates how to create a configuration via the adtf configuration tool
# - Add filter and activerunner
# - Connect components
from subprocess import Popen, PIPE
import os
import time
import shutil
import os.path
# this is a helper class to execute adtf_config_tool with a given command
class AdtfConfigTool:
def __init__(self, session, executable = ""):
self.executable = executable
self.session = session
if not self.executable:
self.executable = "adtf_config_tool"
def exec_control(self, args):
call = [self.executable] + list(map(str, args))
if Popen(call).wait() != 0:
raise Exception("adtf_config_tool exited with an error")
def exec_command(self, args):
call = [self.executable, "--session", self.session, "-e"] + list(map(str, args))
process = Popen(call, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = process.communicate()
exit_code = process.wait()
return [stdout, stderr, exit_code]
def __getattr__(self, name):
def forward_to_control(*args, **kwargs):
(stdout, stderr, exit_code) = self.exec_command([name] + list(map(str, args)))
if exit_code != 0:
raise Exception("adtf_config_tool returned an error: " + str(stderr))
return str(stdout.decode("utf-8"))
return forward_to_control
adtf_dir = os.environ["ADTF_DIR"]
session = adtf_dir + "/src/examples/scripts/empty_session_copy"
if os.path.exists(session):
shutil.rmtree(session)
shutil.copytree(adtf_dir + "/src/examples/scripts/empty_session", session)
# Create helper with a given session file
act = AdtfConfigTool(session + "/demo_playback.adtfsession", adtf_dir + "/bin/adtf_config_tool")
# Add Filter
act.addcomponent("Playback Sample/graphs/video_visual/", "filter", "demo_time_trigger.filter.adtf.cid", "F1")
act.addcomponent("Playback Sample/graphs/video_visual/", "filter", "demo_thread_trigger.filter.adtf.cid", "F2")
# Connect Filter F1 with F2 (SampleStream will be added automatically)
act.connect( "Playback Sample/graphs/video_visual/filters/F1", "Playback Sample/graphs/video_visual/filters/F2", "--output-pin", "out", "--input-pin", "in")
# Add runner
act.addcomponent( "Playback Sample/graphs/video_visual/" , "activerunner", "default_thread_runner.streaming.adtf.cid", "R1")
act.addcomponent( "Playback Sample/graphs/video_visual/" , "activerunner", "default_timer_runner.streaming.adtf.cid", "R2")
# Connect Runner to the filter
act.connect( "Playback Sample/graphs/video_visual/active_runners/R1", "Playback Sample/graphs/video_visual/filters/F1", "--runner-pin", "data_generator_function")
act.connect( "Playback Sample/graphs/video_visual/active_runners/R2", "Playback Sample/graphs/video_visual/filters/F2", "--runner-pin", "data_statistics_function")
# Now check the create components and connections
objects = str(act.objects())
for object in ["Playback Sample/graphs/video_visual/active_runners/R1",
"Playback Sample/graphs/video_visual/active_runners/R2",
"Playback Sample/graphs/video_visual/sample_streams/F1.out",
"Playback Sample/graphs/video_visual/filters/F2",
"Playback Sample/graphs/video_visual/filters/F1",
"Playback Sample/graphs/video_visual/connections/F1.out_F1.out.",
"Playback Sample/graphs/video_visual/connections/F1.out._F2.in",
"Playback Sample/graphs/video_visual/connections/R1._F1.data_generator_function",
"Playback Sample/graphs/video_visual/connections/R2._F2.data_statistics_function"]:
if objects.find(object) != -1:
print("Found component: " + object)
else:
print("Could not find component" + object)