User Guide¶
Introduction¶
This document describes a suite of Python packages that facilitate the implementation and deployment of dynamic filters and feedback controllers.
The package is entirely written in Python. We only support Python Version 3 and have no intent to support Version 2.
Python is widely deployed and supported, with large amounts of tutorials and documentation available on the web. A good resource to start learning Python is the Beginner’s guide to Python.
All code should run on multiple desktop platforms, including Linux and MacOSX. It has also been tested and deployed in the Beaglebobe Black and the Raspberry Pi. In particular we support the Robotics Cape and the Beaglebone Blue. See Section Installation for details.
Mauricio de Oliveira
Acknowledgments¶
The following people contributed significantly to the developement of parts of this package:
- Gabriel Fernandes
- Zhu Zhuo
Installation¶
The Python source code is available from:
Robotics Cape support¶
If you would like to run software on a Beaglebobe Black equipped with the Robotics Cape or the Beaglebone Blue you might need to download and install the Robotics Cape C library from:
Once you have the library installed and the cape running you should also install our Python bindings available as:
Raspberry Pi support¶
TODO
Tutorial¶
In this tutorial you will learn how to use Controllers, work with
signals and the various blocks available with the package
pyctrl
. You will also learn how to implement controllers that interact
with hardware devices. You can run the code in this tutorial
interactively using the python interpreter or by running them as
scripts. All code is available in the Section Examples.
Hello World!¶
Start with the following simple Hello World! example:
# import Python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl import Controller
from pyctrl.block import Printer
from pyctrl.block.clock import TimerClock
# initialize controller
hello = Controller()
# add the signal myclock
hello.add_signal('myclock')
# add a TimerClock as a source
hello.add_source('myclock',
TimerClock(period = 1),
['myclock'],
enable = True)
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Hello World!'),
['myclock'],
enable = True)
try:
# run the controller
with hello:
# do nothing for 5 seconds
time.sleep(5)
except KeyboardInterrupt:
pass
finally:
print('Done')
Depending on the platform you’re running this program will print the message Hello World! on the screen 4 or 5 times. The complete program is in hello_world.py.
What’s going on?¶
Let’s analyze each part of the above code to make sense of what is
going on. The first couple lines import the modules to be used from
the standard Python’s time
and various pyctrl
libraries:
import time
from pyctrl import Controller
from pyctrl.block import Printer
from pyctrl.block.clock import TimerClock
After importing Controller
you can initialize the Python
variable hello
as being a Controller
, more specifically an instance of the class pyctrl.Controller
:
hello = Controller()
A pyctrl.Controller
, by itself, does nothing useful, so
let’s add some signals and blocks that you can interact with. The
line:
hello.add_signal('myclock')
adds the signal myclock
.
A signal holds a numeric scalar or vector and is used to communicate between blocks. The next lines:
hello.add_source('myclock',
TimerClock(period = 1),
['myclock'],
enable = True)
add a TimerClock
as a source. A source is a type of
block that produces at least one output and has no inputs.
The mandatory parameters to pyctrl.Controller.add_source()
are a label, in this case myclock
, a
pyctrl.block
object, in this case
pyctrl.block.clock.TimerClock
, and a list of signal
outputs, in this case the list containg a single signal
['myclock']
. The keyword parameter enable is optional and
means that the source myclock
will be enabled when the
controller starts and will be disabled when the controller stops.
An instance of the class pyctrl.block.clock.TimerClock
implements a clock based on Python’s threading.Timer
class. It’s performance and accuracy can vary depending on the
particular implementation for your platform. The parameter
period = 1
passed to TimerClock
means that the
source myclock
will write to the signal
myclock
a time stamp every 1 second.
The following line:
hello.add_sink('message',
Printer(message = 'Hello World!'),
['myclock'],
enable = True)
adds a pyctrl.block.Printer
as a sink. A sink is a type
of block that takes at least one input but produces no output.
The parameters to pyctrl.Controller.add_sink()
are a label,
in this case 'message'
, a pyctrl.block
object, in
this case pyctrl.block.Printer
, and a list of inputs, in
this case ['myclock']
. The keyword parameter enable means
that the sink message
will be enable when the controller
starts and will be disabled when the controller stops.
An instance of the class pyctrl.block.Printer
implements a
sink that prints messages and the signals appearing at its input. In
this case, the attribute message = 'Hello World!'
is the
message to be printed.
Having created a source and a sink you are ready to run the controller:
with hello:
# do nothing for 5 seconds
time.sleep(5)
Python’s with
statement automatically starts and stops
the controller. Inside the with
, the statement
time.sleep(5)
pauses the program for 5 seconds to let the
controller run its loop and print Hello World! about 5 times. The
actual number of times depends on the accuracy of the timer in your
platform. Pause for 5.1 seconds instead if you would like to make sure
it is printed exactly 5 times.
Secretly behind the statement with hello
is a call to the
pair of methods pyctrl.Controller.start()
and
pyctrl.Controller.stop()
. In fact, alternatively, one could have
written the not so clean:
hello.start()
# do nothing for 5 seconds
time.sleep(5)
hello.stop()
You should always enclose the controller action inside a Python
try
block as in:
try:
# run the controller
with hello:
# do something
pass
except KeyboardInterrupt:
pass
finally:
# do something at the end
pass
This construction allows the controller to be stopped in a predictable
way. Under the hood, the controller is run using multiple threads, which have a
life of their own and can be tricky to stop. The except
statement is called in case an exception occur. In this particular
case if a KeyboardInterrupt
occurs, for example by a user
pressing the <CTRL-C>
key, the code under the
except
is executing without the usual accompanying error
message. The finally
statement should come always after all
except
statements and to makes sure that certain
instructions are always executed, even if an exception occur. The
Python statement pass
is used to signify that no instruction
is to be executed.
When adding blocks to a controller the keyword argument enable is by
default set to False, which means that blocks would remain enabled
even after a controller is stopped. In the case of a
pyctrl.block.TimerClock
object, the clock would continue
to run even as the program terminates, most likely locking your
terminal, which is not the desired behavior you’re after in your first
example. Alternatively you could have disabled the clock “manually” by
issuing the command:
hello.set_source('myclock', enabled = False)
for example in the finally
statement.
The method pyctrl.Controller.set_source()
allows you to set up
attributes of your source, in the case the enabled
attribute that effectively stops the clock. Likewise,
pyctrl.Controller.set_sink()
and
pyctrl.Controller.set_filter()
allow you to set up attributes in sinks and filters.
The controller loop¶
In order to understand what is going on on behind the scenes you can
probe the contents of the variable hello
. For
example, after running the code in Hello World! a call to:
print(hello)
or simply hello
if you are using the interactive Python shell,
produces the output:
<class 'pyctrl.Controller'> with:
0 timer(s), 4 signal(s),
2 source(s), 0 filter(s), and 1 sink(s)
For more information you can use the method
pyctrl.Controller.info()
. For example:
print(hello.info('all'))
produces the output:
<class 'pyctrl.Controller'> with:
0 timer(s), 4 signal(s),
2 source(s), 0 filter(s), and 1 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
4. myclock
> sources
1. clock[Clock, disabled] >> clock
2. myclock[TimerClock, disabled] >> myclock
> filters
> sinks
1. myclock >> message[Printer, disabled]
which details the signals, sources, filters, sinks, and
timers present in the controller hello
. Of course the
signals, sources and sinks correspond to the ones you have added
earlier. Three additional signals, clock
, duty
and is_running
and the additional device clock
show up. Those are always present and will be described later.
Note also that the relationship between sources and sinks with
signals is indicated by a double arrow >>
. In this case, the
source myclock
outputs to the signal myclock
and
the sink message
has as input the same signal
myclock
.
Starting the controller hello
with the statement
with
or pyctrl.Controller.start()
fires up the
following sequence of events:
- The state of every source, filter, sink, or timer that was
installed with the flag enable set to True is raised to
enabled
. - Every source is read and its outputs are copied to the
signals connected to the output of the source. This process
is repeated sequentially for every source which is in the state
enabled
until all sources have run once. - For each filter, the input signals are written to the filter
that is then read and its outputs are copied to the signals
connected to the output of the filter. This process is repeated
sequentially for every filter which is in the state
enabled
until all filters have run once. - The input signals of every sink are written to the sink. This
process is repeated sequentially for every sink which is in the
state
enabled
until all sinks have run once. - If the signal
is_running
is still True go back to step 2, otherwise stop. - The state of every source, filter, sink, or timer that was
installed with the flag enable set to True is lowered to
disabled
.
The signal is_running
can be set to False by calling
pyctrl.Controller.stop()
or exiting the with
statement. In the Hello World! example this is done after doing
nothing for 5 seconds inside the with
statement.
The flow of signals is established by adding sources, filters,
and sinks, which are processed according to the above loop. The
content of the input signals is made available to the filters and
sinks as they are processed. For instance, replace the sink
message
by:
hello.add_sink('message',
Printer(message = 'Hello World @ {:3.1f} s'),
['myclock'])
and run the controller to see a message that now prints the Hello
World message followed by the value of the signal
myclock
. The format {:3.1f} is used as in Python’s
format()
method. More
than one signal can be printed by specifying multiple placeholders
in the attribute message
and more input signals.
Devices¶
As you might suspect after having gone through the Hello World!
example, it is useful to have a controller with a clock. In fact, as
you will learn later in Timers, every
pyctrl.Controller
comes equipped with some kind of
clock. The method pyctrl.Controller.add_device()
automates the process of adding blocks to a controller and is
typically used when adding blocks that should behave as hardware
devices, like a clock. For example, the following code:
from pyctrl import Controller
hello = Controller()
hello.add_device('clock',
'pyctrl.block.clock', 'TimerClock',
outputs = ['clock'],
enable = True,
kwargs = {'period': 1})
automatically creates a pyctrl.block.clock.TimerClock
which is added to controller
as the source labeled
clock
with output signal clock
. As with
regular sources, filters, and sinks, setting the attribute
enable
equal to True makes sure that the device is
enabled at every call to pyctrl.Controller.start()
and
disabled at every call to pyctrl.Controller.stop()
. The
dictionary kwargs contains parameters that are passed when
instantiating the device, in this case a period of 1 second.
The main difference between
pyctrl.Controller.add_device()
and, for instance,
pyctrl.Controller.add_source()
is that
pyctrl.Controller.add_device()
takes as arguments
strings with the package and class name of the source, filter, or
sink, whereas pyctrl.Controller.add_source()
takes in an instance of an object.
The notion of device is much more than a simple convenience
though. By having the controller dynamically initialize a block by
providing the module and class names as strings to
pyctrl.Controller.add_device()
, i.e. the arguments
'pyctrl.block.clock'
and 'TimerClock'
above, it
will be possible to remotely initialize blocks that rely on the
presence of specific hardware using our Client-Server Application Architecture, as you will learn later. Note that you do
not have to directly import any modules when using
pyctrl.Controller.add_device()
.
A controller with a timer based clock is so common that the above
construction is provided as a module in pyctrl.timer
. Using
pyctrl.timer
the Hello World! example can be simplified
to:
# import Python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Printer
# initialize controller
hello = Controller(period = 1)
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Hello World @ {:3.1f} s'),
['clock'],
enable = True)
try:
# run the controller
with hello:
# do nothing for 5 seconds
time.sleep(5)
except KeyboardInterrupt:
pass
The complete code is in hello_timer_1.py.
A call to print(hello.info('all'))
produces:
<class 'pyctrl.timer.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 1 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[TimerClock, disabled] >> clock
> filters
> sinks
1. clock >> message[Printer, disabled]
which reveals the presence of the signal clock
and the
device pyctrl.block.clock.TimerClock
as a source.
In some situations it might be helpful to be able to reset a
controller to its original configuration. This can be done using the
method pyctrl.Controller.reset()
. For example, after
initialization or after calling:
hello.reset()
print(hello.info('all'))
returns:
<class 'pyctrl.timer.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[TimerClock, disabled] >> clock
> filters
> sinks
which shows the presence of the source clock
and the
signal clock
but no other source, filter, sink, or
timer.
Timers¶
As you have learned so far, all sources, filters, and sinks are
continually processed in a loop. In the above example you have
equipped the controller hello
with a
pyctrl.block.clock.TimerClock
, either explicitly, as in
Hello World!, or implicitly, by loading
pyctrl.timer.Controller
. Note that the controller itself has
no notion of time and that events happen periodically simply because
of the presence of a pyctrl.block.clock.TimerClock
, which
will stop processing until the set period has elapsed. In fact, the
base class pyctrl.timer.Controller
is also equipped with a
clock source except that this clock does not attempt to interrupt
processing, but simply writes the current time into the signal
clock
every time the controller loop is restarted. A
controller with such clock runs as fast as possible.
For example, the code:
# import Python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl import Controller
from pyctrl.block import Printer
# initialize controller
hello = Controller()
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Current time {:5.3f} s',
endln = '\r'),
['clock'])
try:
# run the controller
with hello:
# do nothing for 5 seconds
time.sleep(5)
except KeyboardInterrupt:
pass
will print the current time with 3 decimals as fast as possible on the
screen. The additional attribute endl = '\r'
introduces a
carriage return without a line-feed so that the printing happens in a
single terminal line. Now suppose that you still want to print the
Hello World! message every second. You can achieve this using
timers. Simply add the following snippet before running the
controller:
# add a Printer as a timer
hello.add_timer('message',
Printer(message = 'Hello World @ {:3.1f} s '),
['clock'], None,
period = 1, repeat = True)
to see the Hello World message printed every second as the main loop
prints the Current time message as fast as possible. The parameters
of the method pyctrl.Controller.add_timer()
are the label
and block, in the case 'message'
and the
pyctrl.block.Printer
object, followed by a list of signal
inputs, in this case ['clock']
, and a list of signal
outputs, in this case None
, then the timer period in
seconds, and a flag to tell whether the execution of the block
should repeat periodically, as opposed to just once.
An example of a useful timer event to be run only once is the following:
from pyctrl.block import Constant
# Add a timer to stop the controller
hello.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 5, repeat = False)
which will stop the controller after 5 seconds by setting the signal
is_running
to zero. In fact, after adding the above timer
one could run the controller loop by simply waiting for the controller
to terminate using pyctrl.Controller.join()
as in:
with hello:
hello.join()
Note that your program will not terminate until all blocks and
timers terminate, so it is still important that you always call
pyctrl.Controller.stop()
or use the with
statement to
exit cleanly.
A complete example with all the ideas discussed above can be found in hello_timer_2.py.
Filters¶
So far you have used only sources, like
pyctrl.block.clock.TimerClock
, and sinks, like
pyctrl.block.Printer
. Sources produce outputs and take no
input and sinks take inputs but produce no output. Filters take
inputs and produce outputs. Your first filter will be used to
construct a signal which you will later apply to a motor. Consider the
following code:
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Constant, Printer
# initialize controller
Ts = 0.1
hello = Controller(period = Ts)
# add motor signals
hello.add_signal('pwm')
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add filter to interpolate data
hello.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# add logger
hello.add_sink('printer',
Printer(message = 'time = {:3.1f} s, motor = {:+6.1f} %',
endln = '\r'),
['clock','pwm'])
# Add a timer to stop the controller
hello.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
try:
# run the controller
with hello:
hello.join()
except KeyboardInterrupt:
pass
As you learned before, the sink printer
will print the
time signal clock
and the value of the signal
pwm
on the screen, and the timer stop
will
shutdown the controller after 6 seconds. The new block here is the
filter input
, which uses the block
pyctrl.block.Interp
. This block will take as input the time
given by the signal clock
and produce as a result a value
that interpolates the values given in the arrays ts
and
us
. Internally it uses numpy.interp()
function. See the numpy documentation
for details. The reason for the name pwm
will be explained
later in Section Simulated motor example. The block
pyctrl.block.Interp
will always consider its x argument to
be relative to the first time the filter is written to. That’s why
ts
starts at 0. If you need to run it again just reset
the block calling:
hello.set_filter('input', reset = True)
as will be explained in Section Modifying Blocks.
The key aspect in this example is how filters process
signals. This can be visualized by calling
print(hello.info('all'))
:
<class 'pyctrl.timer.Controller'> with:
1 timer(s), 4 signal(s),
1 source(s), 1 filter(s), and 1 sink(s)
> timers
1. stop[Constant, period = 6, enabled] >> is_running
> signals
1. clock
2. duty
3. is_running
4. pwm
> sources
1. clock[TimerClock, disabled] >> clock
> filters
1. clock >> input[Interp, enabled] >> pwm
> sinks
1. clock, pwm >> printer[Printer, enabled]
where you can see the relationship between the inputs and outputs
signals indicated by a pair of arrows >>
coming in and out
of the the filter input
. The complete code can be found
in hello_filter_1.py.
Because of the way the controller loop proceeds (see The controller loop), you can use the same signal as both input and output of a filter. For example, the filter:
from pyctrl.block.system import Gain
hello.add_filter('gain',
Gain(gain = .5),
['pwm'],
['pwm'])
scales the signal pwm
by a factor of 0.5 and has the
pwm
as both an input as well as an output signal.
Modifying Blocks¶
Instances of pyctrl.block.Block
can have its attributes
retrieved and modified using the methods
pyctrl.block.Block.get()
and
pyctrl.block.Block.set()
. There is also the special methods
pyctrl.block.Block.reset()
,
pyctrl.block.Block.is_enabled()
and
pyctrl.block.Block.set_enabled()
.
However, once you install an instance of pyctrl.block.Block
in a controller as a source, filter, sink, or timer, you
should no longer directly call those methods. Instead you should
retrieve and set block attributes using the family of methods
pyctrl.Controller.get_source()
,
pyctrl.Controller.get_filter()
,
pyctrl.Controller.get_sink()
,
pyctrl.Controller.get_timer()
,
pyctrl.Controller.set_source()
,
pyctrl.Controller.set_filter()
,
pyctrl.Controller.set_sink()
, and
pyctrl.Controller.set_timer()
. The reason for this is that,
depending on the context, the block instance owned by the controller
might be different than the one you originally installed. This is the
case, for example, when you run a controller remotely using the
Client-Server Application Architecture.
For example, for the same controller you implemented in Section Filters:
hello.get_source('clock')
would produce something like:
{'average_period': 0,
'count': 33,
'enabled': True,
'period': 0.1,
'time': 491901.67835816,
'time_origin': 491898.26005493104}
which is a dictionary with all public properties of the source
clock
, an instance of
pyctrl.block.clock.TimerClock
. If only one attribute is
sought, as in:
hello.get_source('clock', 'count')
then pyctrl.Controller.get_source()
returns simply
33
.
Likewise:
hello.set_sink('printer', endln = '\n')
changes the attribute endln
in the sink
printer
. More than one atribute can be changed at a time by
passing multiple keyword arguments, as in:
hello.set_sink('printer', endln = '\n', message = 'New message')
There are two special attributes that can be invoked for any block:
reset
and enabled
. Calling:
hello.set_source('clock', reset = True)
will internally call pyctrl.block.Block.reset()
and:
hello.set_source('clock', enabled = False)
will call pyctrl.block.Block.set_enabled()
.
Sources, filters, sinks, and timers can also be removed using
pyctrl.Controller.remove_source()
,
pyctrl.Controller.remove_filter()
,
pyctrl.Controller.remove_sink()
, and
pyctrl.Controller.remove_timer()
. For example:
hello.remove_sink('printer')
removes the sink printer
from the controller loop.
Finally, one can also read and write to blocks using
pyctrl.Controller.read_source()
,
pyctrl.Controller.read_filter()
,
pyctrl.Controller.write_filter()
, and
pyctrl.Controller.write_sink()
. These will be used in the
next section to read values from a block that logs running data.
Working with data¶
So far you have been running blocks and displaying the results on your
screen using pyctrl.block.Printer
. If you would want to
store the generated data for further processing you should instead use
the block pyctrl.block.Logger
. Let us revisit the example
from Section Filters, this time adding also a
pyctrl.block.Logger
. The only difference is the introduction
of the additional sink:
from pyctrl.block import Logger
# add logger
hello.add_sink('logger',
Logger(),
['clock','pwm'])
A complete example can be found in hello_filter_2.py. Once the
controller has run, you can then retrieve all generated data by
reading from the sink logger
using the method
pyctrl.block.Logger.get()
to retrieve the property log as
in:
# retrieve data from logger
data = hello.get_sink('logger', 'log')
retrieves the data stored in logger
and copy it to the
dictionary data
. Data is stored by row, with one key per
signals used as inputs to the pyctrl.block.Logger
. One can
conveniently access the data by using the signal label:
clock = data['clock']
pwm = data['pwm']
Since this is Python, you can now do whatever you please with the data. For example you can use matplotlib to plot the data:
# import matplotlib
import matplotlib.pyplot as plt
# start plot
plt.figure()
# plot input
plt.plot(clock, pwm, 'b')
plt.ylabel('pwm (%)')
plt.xlabel('time (s)')
plt.ylim((-120,120))
plt.xlim(0,6)
plt.grid()
# show plots
plt.show()
After running the controller hello
, the above snippet
should produce a plot like the one below:

from which you can visualize the input signal pwm
constructed by the pyctrl.block.Interp
block. Note that for
better granularity the sampling period used in
hello_filter_2.py is 0.01 s, whereas the one used in
hello_filter_1.py was only 0.1 s.
Simulated motor example¶
You will now work on a more sophisticated example, in which you will combine various filters to produce a simulated model of a DC-motor. The complete code is in simulated_motor_1.py.
A transfer-function model¶
The beginnig of the code is similar to hello_filter_2.py:
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Logger, Constant
from pyctrl.system.tf import DTTF, LPF
# initialize controller
Ts = 0.01
simotor = Controller(period = Ts)
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add motor signal
simotor.add_signal('pwm')
# add filter to interpolate data
simotor.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
Note that you will be simulating this motor with a sampling period of 0.01 seconds, that is, a sampling frequency of 100 Hz. The model you will use for the DC-motor is based on the diffential equation model:
where \(u\) is the motor input voltage, \(\theta\) is the motor angular displacement, and \(g\) and \(\tau\) are constants related to the motor physical parameters. The constant \(g\) is the gain of the motor, which relates the steady-state velocity achieved by the motor in response to a constant input voltage, and the constant \(\tau\) is the time constant of the motor, which is a measure of how fast the motor respond to changes in its inputs. If you have no idea of what’s going on here, keep calm and read on! You do not need to understand all the details to be able to use this model.
Without getting into details, in order to simulate this differential equation you will first convert the above model in the following discrete-time difference equation:
where \(T_s\) is the sampling period. It is this equation that you will simulate by creating the following filter:
# import math and numpy
import math, numpy
from pyctrl.block.system import System
from pyctrl.system.tf import DTTF
# Motor model parameters
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
c = math.exp(-Ts/tau)
d = (g*Ts)*(1-c)/2
# add motor signals
simotor.add_signal('encoder')
# add motor filter
simotor.add_filter('motor',
System(model = DTTF(
numpy.array((0, d, d)),
numpy.array((1, -(1 + c), c)))),
['pwm'],
['encoder'])
The input signal to the filter motor
is the signal
pwm
, which is the signal that receives the interpolated
input data you created earlier. The ouput of the filter
motor
is the signal encoder
, which corresponds
to the motor angular position \(\theta\).
The block used in the filter motor
is of the class
pyctrl.block.system.System
, which allows one to incorporate
a variety of system models into filters. See Module pyctrl.system for other types of system models available. The
particular model you are using is a pyctrl.system.DTTF
, in
which DTTF stands for Discrete-Time Transfer-Function. This model
corresponds to the difference equation discussed above. Note
dependency on Python’s math library and numpy.
To wrap it up you will add a sink pyctrl.block.Logger
to
collect the data generated during the simulation and a timer to stop
the controller:
# add logger
simotor.add_sink('logger',
Logger(),
['clock','pwm','encoder'])
# Add a timer to stop the controller
simotor.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
As usual, the simulation is run with:
# run the controller
with simotor:
simotor.join()
Collecting and plotting the results¶
After running the simulation you can read the data collected by the logger:
# read logger
data = simotor.get_sink('logger', 'log')
clock = data['clock']
pwm = data['pwm']
encoder = data['encoder']
and plot the results using matplotlib:
# import matplotlib
import matplotlib.pyplot as plt
# start plot
plt.figure()
# plot input
plt.subplot(2,1,1)
plt.plot(clock, pwm, 'b')
plt.ylabel('pwm (%)')
plt.ylim((-120,120))
plt.grid()
# plot position
plt.subplot(2,1,2)
plt.plot(clock, encoder,'b')
plt.ylabel('position (cycles)')
plt.ylim((0,25))
plt.grid()
# show plots
plt.show()
to obtain a plot similar to the one below:

where you can visualize both the motor input signal pwm
and the motor output signal encoder
, which predicts that
the motor will stop at about 13 cycles (revolutions) from its original
position if the input signal pwm
were applied at its
input.
Calculating velocity and low-pass filtering¶
The above setup is one that corresponds to a typical microcontroller interface to a DC-motor, in which the motor voltage is controlled through a PWM (Pulse-Width-Modulation) signal ranging from 0-100% of the pulse duty-cycle (with negative values indicating a reversal in voltage polarity), and the motor position is read using an encoder. In this situation, one might need to calculate the motor velocity from the measured position. You will do that now by adding a couple more filters to the simulated motor model. The complete code can be found in simulated_motor_2.py.
After introducing filters to produce the signals pwm
and encoder
, you will add another filter to calculate the
speed by differentiating the encoder
signal:
from pyctrl.block.system import Differentiator
# add motor speed signal
simotor.add_signal('speed')
# add motor speed filter
simotor.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
The filter speed
uses a block
pyctrl.block.system.Differentiator
that takes as input both
the clock
signal and the signal encoder
, which
is the one being differentiated, and produces the output signal
speed
.
Differentiating a signal is always a risky proposition, and should
be avoided whenever possible. Even in this simulated environment,
small variations in the clock period and in the underlying
floating-point calculations will give rise to noise in the signal
speed
. In some cases one can get around by filtering the
signal. For example, by introducing a low-pass filter as in:
from pyctrl.system.tf import LPF
# add low-pass signal
simotor.add_signal('fspeed')
# add low-pass filter
simotor.add_filter('LPF',
System(model = LPF(fc = 5, period = Ts)),
['speed'],
['fspeed'])
The filter LPF
uses a block
pyctrl.block.system.System
that takes as input the
speeed
signal and produces the output signal
fspeed
, which is the filtered version of the input
speeed
. The model used in
pyctrl.block.system.System
is the low-pass filter
pyctrl.system.tf.LPF
with cutoff frequency fc
equal
to 5 Hz.
Finally collect all the data in the logger:
# add logger
simotor.add_sink('logger',
Logger(),
['clock','pwm','encoder','speed','fspeed'])
After all that you should have a controller with the following blocks:
<class 'pyctrl.timer.Controller'> with:
1 timer(s), 7 signal(s),
1 source(s), 4 filter(s), and 1 sink(s)
> timers
1. stop[Constant, period = 6, enabled] >> is_running
> signals
1. clock
2. duty
3. encoder
4. fspeed
5. is_running
6. pwm
7. speed
> sources
1. clock[TimerClock, disabled] >> clock
> filters
1. clock >> input[Interp, enabled] >> pwm
2. pwm >> motor[System, enabled] >> encoder
3. clock, encoder >> speed[Differentiator, enabled] >> speed
4. speed >> LPF[System, enabled] >> fspeed
> sinks
1. clock, pwm, encoder, speed, fspeed >> logger[Logger, enabled]
Running simulated_motor_2.py produces a plot similar to the one shown below:

where you can simultaneously visualize the signal pwm
,
the signal speed
as calculated by the
differentiator, and the filtered speed signal fspeed
.
Note how the order of the filters is important. Output that is
needed as input for other filters must be computed first if they are
to be applied in the same iteration of the controller
loop. Otherwise, their update values will only be applied on the next
iteration. That would be the case, for example, if you had inverted
the order of the filters motor
and speed
as
in:
> filters
1. clock >> input[Interp, enabled] >> pwm
2. clock, encoder >> speed[Differentiator, enabled] >> speed
3. pwm >> motor[System, enabled] >> encoder
4. speed >> LPF[System, enabled] >> fspeed
which would make the filter speed
always see the input
signal encoder
as calculated in the previous loop
iteration. Note how this would also affect the input to the filter
LPF
!
Interfacing with hardware¶
In this section you will learn how to interface with real hardware. Of course you can only run the examples in this section if you have the appropriate hardware equipment.
Before you begin¶
For demonstration purposes it will be assumed that you have an Educational MIP (Mobile Inverted Pendulum) kit with a Beaglebone Black equipped with a Robotics Cape or a Beaglebone Blue. You may have to download additional libraries and the rcpy package. See Section Installation for details.
Make sure that all required software is installed and working before proceeding. Consult the documentation provided in the links above and the Section Installation for more details.
Installing devices¶
Before you can interact with hardware you have to install the appropriate devices. The following code will initialize a controller that can interface with the Robotics Cape:
# import Controller and other blocks from modules
from pyctrl.rc import Controller
# initialize controller
Ts = 0.01
bbb = Controller(period = Ts)
Note that the code is virtually the same as used before except that
you are importing Controller from pyctrl.rc
rather than
from pyctrl
or pyctrl.timer
. This controller
automatically adds a clock based on the MPU9250 IMU. You can check its
presence by typing:
print(bbb.info('sources'))
which produces the output:
> sources
1. clock[MPU9250, enabled] >> clock
It is now time to install the devices you will be using. For this demonstration you will use one of the MIP’s motor and the corresponding encoder. First load the encoder:
# add encoder as source
bbb.add_device('encoder1',
'pyctrl.rc.encoder', 'Encoder',
outputs = ['encoder'],
kwargs = {'encoder': 3,
'ratio': 60 * 35.557})
which will appear as a source labeled encoder1
connected
to the output signal encoder
.
You install devices using the same method
pyctrl.Controller.add_device()
you already worked with
before. Besides the mandatory parameters label, device_module, and
device_class, you should pass the corresponding list of inputs and
outputs signals as well as any initialization parameters in the
dictionary kwargs.
The parameters in kwargs are specific to the device and are passed
to the device_module and device_class constructor. Each device has
its own specific set of parameters. In the above example, the
attribute encoder
is set to 3, which selects the 3rd (out
of a total of 4 available) hardware encoder counter in the Beaglebone
Black, and ratio
is set to 60 * 35.557 to reflect the
presence of a gear box connected between the encoder and the wheel
shaft, which is the movement that you would like the encoder to
measure. Using the above ratio, the unit of the signal
encoder
will be cycles, that is, one complete turn of the
wheel will add or substract one to the signal encoder
.
You load the motor as:
# add motor as sink
bbb.add_device('motor1',
'pyctrl.rc.motor', 'Motor',
inputs = ['pwm'],
kwargs = {'motor': 3},
enable = True)
which will appear as the sink motor1
connected to the
input signal pwm
. Note that the above code makes use of
the optional parameter enable
, which controls whether the
device should be enabled at pyctrl.Controller.start()
and
disabled at pyctrl.Controller.stop()
. In the case of motors
or other devices that can present danger if left in some unknown
state, this is done for safety: terminating or aborting your code will
automatically turn off the physical motor. Note that the source
encoder1
will remain enabled all the time, since there is
no danger in keeping counting your encoder pulses even when the
controller is off.
As with the encoder, the motor constructor takes the additional
parameter motor
provided in the dictionary kwargs. In
this case you have selected the 3rd (out of a total of 4 available)
hardware motor drivers (H-bridges) in the Robotics Cape or Beaglebone
Blue. Those are driven by the Beaglebone Black or Blue PWM hardware
generators, which in this case is controlled by the input signal
pwm
taking values between -100 and 100. Negative values
reverse the polarity of the voltage applied to the motor causing a
reversal in the motor direction. Note that the value of the actual
voltage applied to the motor will depend on the voltage source
connected to the Robotics Cape. In the case of the Educational MIP kit
this voltage will be approximately 7.4 V when the battery is fully
charged.
The current configuration of the controller after installing the
devices is shown in the output of print(bbb.info('all'))
:
<class 'pyctrl.rc.Controller'> with:
0 timer(s), 5 signal(s),
2 source(s), 0 filter(s), and 1 sink(s)
> timers
> signals
1. clock
2. duty
3. encoder
4. is_running
5. pwm
> sources
1. clock[MPU9250, enabled] >> clock
2. encoder1[Encoder, enabled] >> encoder
> filters
> sinks
1. pwm >> motor1[Motor, disabled]
Using hardware devices¶
Once hardware devices are installed as sinks, filters, or sources, you can use them exactly as before. Sensors will usually be installed as sources and actuators typically as sinks.
Because you use the same names for the signals handled by the encoder and motor devices as the ones used in the Section Simulated motor example, you can simply copy parts of that code to repeat the motor experiment, this time using real hardware. For example, the code:
from pyctrl.block import Interp, Logger, Constant
from pyctrl.block.system import System, Differentiator
from pyctrl.system.tf import LPF
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add filter to interpolate data
bbb.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# add motor speed signal
bbb.add_signal('speed')
# add motor speed filter
bbb.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
# add low-pass signal
bbb.add_signal('fspeed')
# add low-pass filter
bbb.add_filter('LPF',
System(model = LPF(fc = 5, period = Ts)),
['speed'],
['fspeed'])
# add logger
bbb.add_sink('logger',
Logger(),
['clock','pwm','encoder','speed','fspeed'])
# Add a timer to stop the controller
bbb.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
will produce a controller with the following connections:
<class 'pyctrl.rc.Controller'> with:
1 timer(s), 7 signal(s),
2 source(s), 3 filter(s), and 2 sink(s)
> timers
1. stop[Constant, period = 6, enabled] >> is_running
> signals
1. clock
2. duty
3. encoder
4. fspeed
5. is_running
6. pwm
7. speed
> sources
1. clock[MPU9250, enabled] >> clock
2. encoder1[Encoder, enabled] >> encoder
> filters
1. clock >> input[Interp, enabled] >> pwm
2. clock, encoder >> speed[Differentiator, enabled] >> speed
3. speed >> LPF[System, enabled] >> fspeed
> sinks
1. pwm >> motor1[Motor, disabled]
2. clock, pwm, encoder, speed, fspeed >> logger[Logger, enabled]
You run this controller program invoking:
# run the controller
with bbb:
bbb.join()
Upon running the complete code provided in rc_motor.py the following plots are produced using matplotlib:

To the naked eye, the position plot above is virtually identical to the one obtained using the simulated model from Section Simulated motor example. Some subtle differences are visible in the velocity plot below:

where you can see that the motor has some difficulties overcoming stiction, that is the static friction force that dominates when the velocities become small: it takes a bit longer to start around 1 s and it gets stuck again around 3.7 s when the velocity becomes zero. Note also the more pronounced noise which is amplified by the differentiator and then attenuated by the low-pass filter.
You might want to take the additional step:
# reset the clock
bbb.set_source('clock', reset = True)
of resetting the clock before starting the controller if you want your clock to start at 0.
Closed-loop control¶
The initial motivation to write this package was to be able to easily implement and deploy feedback controllers. The subject of feedback control is extensive and will not be covered in any detail here. A completely unbiased and awesome reference is [deO16]. The treatment is suitable to undergraduates students with an engineering or science background.
Do not let yourself be intimidated by the language here, you do not need to understand all the details to implement or, better yet, to benefit from using a feedback controller!
Proportional-Integral motor speed control¶
You will now turn to the implementation of a closed-loop Proportional-Integral controller, or PI controller for short, on the same hardware used in the Section Interfacing with hardware. Start by installing the same devices as before, one motor and one encoder:
# import Controller and other blocks from modules
from pyctrl.rc import Controller
# initialize controller
Ts = 0.01
bbb = Controller(period = Ts)
# add encoder as source
bbb.add_device('encoder1',
'pyctrl.rc.encoder', 'Encoder',
outputs = ['encoder'],
kwargs = {'encoder': 3,
'ratio': 60 * 35.557})
# add motor as sink
bbb.add_device('motor1',
'pyctrl.rc.motor', 'Motor',
inputs = ['pwm'],
kwargs = {'motor': 3},
enable = True)
Because you will be controlling the motor speed, add also a differentiator:
from pyctrl.block.system import Differentiator
# add motor speed signal
bbb.add_signal('speed')
# add motor speed filter
bbb.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
According to the dynamic model introduced earlier in Section Simulated motor example, the transfer-function from the PWM input, \(u\), to the motor velocity, \(\omega = \dot{\theta}\), is:
You will implement a PI (Proportional-Integral) controller with transfer-function:
The way by which you will connect this controller to the motor is given in the feedback block-diagram:
Feedback here means that a measurement of the motor speed, \(\omega\), will be compared with a reference speed, \(\bar{\omega}\), to create an error signal, \(e\), that will then be fed back to the Motor by the Controller. When \(\omega\) matches \(\bar{\omega}\) exactly then the error signal, \(e\), is zero. It is the controller’s job to produce a suitable PWM input, \(u\), so that this is possible. The PI controller does that by integrating the error signal. Indeed, the transfer-function of the PI controller corresponds to:
In a way, the integrator estimates the necessary level of motor PWM input, \(u\), so that the error can be made small, in other words, so that the motor can track a desired reference speed, \(\bar{\omega}\). Indeed, if the controller succeeds in its task to keep the error signal small, that is \(e = 0\), then the contribution from the proportional term, \(K_{\mathrm{p}} e(t)\), will also be zero.
There’s lot to be said about how to design suitable gains \(K_{\mathrm{p}}\) and \(K_{\mathrm{i}}\) [deO16]. Here you will choose
so that the closed-loop transfer-function from \(\bar{\omega}\) to \(\omega\) becomes
This will make the motor respond with the same time-constant as if it were in open-loop but this time with the ability to track a constant reference velocity signal \(\bar{\omega}\).
Taking advantage of the blocks pyctrl.block.system.System
and pyctrl.block.system.Feedback
, and of the PID control
algoritm provided in pyctrl.system.tf.PID
you can calculate
and implement this PI controller in only a few lines of code:
from pyctrl.block.system import Feedback, System
from pyctrl.system.tf import PID
# calculate PI controller gains
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
Kp = 1/g
Ki = Kp/tau
# build controller block
pid = System(model = PID(Kp = Kp, Ki = Ki, period = Ts))
# add motor speed signal
bbb.add_signal('speed_reference')
# add controller to the loop
bbb.add_filter('PIcontrol',
Feedback(block = pid),
['speed','speed_reference'],
['pwm'])
The block pyctrl.block.system.Feedback
implements the
operations inside the dashed box in the feedback diagram. That is, it calculates the error signal,
\(e\), and evaluates the block given as the attribute
block
, in this case the
pyctrl.block.system.System
containing as the attribute
model
the controller pyctrl.system.tf.PID
.
The complete code, including a reference speed that looks like the PWM input used before to drive the motor in Sections Simulated motor example and Interfacing with hardware, is in the example rc_motor_control.py. Results obtained with the MIP kit should look like the following plot:

Note how the motor speed tracks the reference signal in closed-loop, effectively calculating the required PWM input necessary for acomplishing that. Compare this behaviour with the previous open-loop graphs in which a curve similar to the reference speed was instead applied directly to the motor PWM input. Look also for some interesting side-effects of feedback control, such as the somewhat smoother behavior near the points where the motor reaches zero speed. Look for [deO16] for much more in depth discussions.
State-space MIP balance controller¶
Your second feedback controller will be more sophisticated. You will use two measurements to balance the MIP kit in its upright position. More details on the modeling and design of the controller you will implement here can be found in [Zhuo16]. The final controller corresponds to the following feedback diagram:
in which you can see that the feedback controller makes use of two measurements, the vertical angle velocity, \(\dot{\theta}\), and the wheel angular velocity, \(\dot{\phi}\). It also takes in a reference wheel angular velicity, \(\bar{\dot{\phi}}\), that can be used to drive the MIP backward and forward.
As described in detail in [Zhuo16], the discrete-time controller, corresponding to the block inside the dashed box in the feedback diagram, is given by a discrete-time state-space model of the form:
where \(y_k\) represents the controller input, consisting of the measurement and error signals
and \(u_k\) is the PWM input to be applied to both left and right motors.
Implementing this controller is very simple. First initialize the controller as:
# import blocks and controller
from pyctrl.rc.mip import Controller
from pyctrl.block.system import System, Subtract, Differentiator, Sum, Gain
from pyctrl.block.nl import ControlledCombination
from pyctrl.block import Logger, ShortCircuit
from pyctrl.system.ss import DTSS
# create mip
mip = Controller()
Note that you have imported the special
pyctrl.rc.mip.Controller
class that already initializes all
devices needed for controlling the MIP. A look at mip.info('all')
:
<class 'pyctrl.rc.mip.Controller'> with:
0 timer(s), 9 signal(s),
4 source(s), 0 filter(s), and 2 sink(s)
> timers
> signals
1. clock
2. duty
3. encoder1
4. encoder2
5. is_running
6. pwm1
7. pwm2
8. theta
9. theta_dot
> sources
1. clock[MPU9250, enabled] >> clock
2. inclinometer[Inclinometer, enabled] >> theta, theta_dot
3. encoder1[Encoder, enabled] >> encoder1
4. encoder2[Encoder, enabled] >> encoder2
> filters
> sinks
1. pwm1 >> motor1[Motor, disabled]
2. pwm2 >> motor2[Motor, disabled]
reveals that pyctrl.rc.mip.Controller
already installed the
following devices:
- a clock;
- one inclinometer, which is based on a built in giroscope and will be used to measure \(\dot{\theta}\); the inclinometer also produces a measurement of \(\theta\) that is only accurate under small velocities and accelerations;
- two motors, which give access to the two PWM signals driving the left and right motors of the MIP;
- two encoders, which measure the relative angular displacement between the body of MIP and the axis of the left and right motors, from which you will measure \(\dot{\phi}\).
The angular velocity \(\dot{\phi}\) can be obtained after averaging the two wheel encoders and differentiating the resulting angle \(\phi\):
# phi is the average of the encoders
mip.add_signal('phi')
mip.add_filter('phi',
Sum(gain=0.5),
['encoder1','encoder2'],
['phi'])
# phi dot
mip.add_signal('phi_dot')
mip.add_filter('phi_dot',
Differentiator(),
['clock','phi'],
['phi_dot'])
Also add the reference signal \(\bar{\dot{\phi}}\):
# phi dot reference
mip.add_signal('phi_dot_reference')
Having all signals necessary for feedback, construct and implemented the feedack controller as follows:
import numpy as np
# state-space matrices
A = np.array([[0.913134, 0.0363383],[-0.0692862, 0.994003]])
B = np.array([[0.00284353, -0.000539063], [0.00162443, -0.00128745]])
C = np.array([[-383.009, 303.07]])
D = np.array([[-1.22015, 0]])
B = 2*np.pi*(100/7.4)*np.hstack((-B, B[:,1:]))
D = 2*np.pi*(100/7.4)*np.hstack((-D, D[:,1:]))
ssctrl = DTSS(A,B,C,D)
mip.add_signal('pwm')
mip.add_filter('controller',
System(model = ssctrl),
['theta_dot','phi_dot','phi_dot_reference'],
['pwm'])
As a final step connect the signal pwm
to both motors
using a pyctrl.block.ShortCircuit
:
# connect to motors
mip.add_filter('cl1',
ShortCircuit(),
['pwm'],
['pwm1'])
mip.add_filter('cl2',
ShortCircuit(),
['pwm'],
['pwm2'])
The code for a complete controller with some added bells and whitles to let you drive the MIP while balancing upright is given in rc_mip_balance.py. A video of the resulting balancing controller is available here.
More advanced usage¶
The next sections describe tasks that are better suited to advanced users, such as working with the provided Client-Server architecture, extending Controllers, or writing your own Blocks. Make sure you have gone through the Tutorial and have a good understanding of the concepts discussed there before reading this chapter.
Qualified names and containers¶
Instances of the class pyctrl.block.container.Container
can hold and execute signals, sources, filters, sinks, and
timers, just like an instance of
pyctrl.Controller
. Indeed pyctrl.Controller
inherits most of its functionality from
pyctrl.block.container.Container
. An instance of
pyctrl.block.container.Container
works as a filter,
which can be installed using pyctrl.Controller.add_filter
or pyctrl.Controller.add_device
or as a timer using
pyctrl.Controller.add_timer
.
As the name suggests, a pyctrl.block.container.Container
can contain other blocks, just like a pyctrl.Controller
does. In order to access elements inside a container one uses a
qualified name involving the special character forward slash
(/). For example, consider the following code:
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block.system import Gain
from pyctrl.block.container import Container, Input, Output
# initialize controller
controller = Controller(period = 1)
controller.add_signals('s1','s2','s3')
controller.add_filter('gain',
Gain(gain = 2),
['s1'], ['s2'])
# add container
controller.add_filter('container',
Container(),
['s1'], ['s3'])
# add elements inside the container
controller.add_signals('container/s1', 'container/s2')
controller.add_source('container/input',
Input(),
['s1'])
controller.add_filter('container/gain',
Gain(gain = 3),
['s1'],['s2'])
controller.add_sink('container/output1',
Output(),
['s2'])
The command:
controller.add_filter('container',
Container(),
['s1'], ['s3'])
adds a filter called container
with input s1
and
output s3
. Once an instance of
pyctrl.block.container.Container
has been added to a
controller, its elements can be accessed by using a qualified name
which is preceded by the name of the container separated by a forward
slash (/). For example, the code:
controller.add_signals('container/s1', 'container/s2')
adds two signals, s1
and s2
, to the container
container
. Note that names inside containers are local,
s1
and container/s1
refer to different signals!
Likewise, the command:
controller.add_filter('container/gain',
Gain(gain = 3),
['s1'],['s2'])
adds a filter called gain
to the container
container
. Note that the inputs and outputs above refer to
signals which are local to container
, that is the signals
container/s1
and container/s2
. In fact, the parameters
inputs and outputs in pyctrl.Controller.add_filter
as
well as pyctrl.Controller.add_source
,
pyctrl.Controller.add_sink
, and
pyctrl.Controller.add_device
, must all be local
symbols. In order to connect the inputs and output of the container
with signals of the controller we use two special blocks:
pyctrl.block.container.Input
and
pyctrl.block.container.Output
. For example:
controller.add_source('container/input',
Input(),
['s1'])
connects the single input of the container, the signal s1
to
the local container signal container/s1
, and:
controller.add_sink('container/output1',
Output(),
['s2'])
connects the local container signal container/s2
to the single
output of the container, the signal s3
.
The above controller corresponds to the following configuration:
<class 'pyctrl.timer.Controller'> with:
0 timer(s), 6 signal(s),
1 source(s), 2 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
4. s1
5. s2
6. s3
> sources
1. clock[TimerClock, disabled] >> clock
> filters
1. s1 >> gain[Gain, enabled] >> s2
2. s1 >> container[Container, disabled] >> s3
<class 'pyctrl.block.container.Container'> with:
0 timer(s), 2 signal(s),
1 source(s), 1 filter(s), and 1 sink(s)
> timers
> signals
1. s1
2. s2
> sources
1. input[Input, enabled] >> s1
> filters
1. s1 >> gain[Gain, enabled] >> s2
> sinks
1. s2 >> output1[Output, enabled]
> sinks
Note how the contents of the container are shown indented. Executing:
import time
controller.set_signal('s1', 1)
with controller:
time.sleep(1.1)
print('s1 = {}'.format(controller.get_signal('s2')))
print('s2 = {}'.format(controller.get_signal('s3')))
will produce:
s1 = 2
s2 = 3
For a practical example of containers used to synchronize activities on a timer see rc_mip_balance.py.
Multiplexing and demultiplexing¶
Blocks that are instances of pyctrl.block.BufferBlock
support multiplexing of inputs and demultiplexing of
outputs.
Multiplexing means that all the inputs of a
pyctrl.block.BufferBlock
are collected into a single numpy
1D-array before the block is evaluated.
Demultiplexing means that the outputs of a
pyctrl.block.BufferBlock
are split into multiple outputs
after the block is evaluated.
For example, the blocks pyctrl.block.system.System
and
pyctrl.block.system.TimeVaryingSystem
always multiplexes
their input. This means that instances of
pyctrl.system.System
can seamlessly handle systems with
multiple inputs.
The attributes mux
and demux
can be also used to
modify the behavior of existing blocks. For this reason, you will
rarely need a special block for multiplexing and demultiplexing. If
you do, just use pyctrl.block.BufferBlock
. For example, a
mux-type block can be created by setting demux = True
in a
pyctrl.block.BufferBlock
as in:
from pyctrl.block import BufferBlock
controller.add_filter('mux',
BufferBlock(mux = True),
['input1','input2'],
['muxout'])
Likewise, you could modify an existing block, such as
pyctrl.block.system.Gain
to demultiplex its outputs as in:
from pyctrl.block.system import Gain
controller.add_filter('gain',
Gain(demux = True),
['muxout'],
['output1','output2'])
Because blocks can arbitrarily manipulate signals, it is not possible to detect inconsistencies in the sizes of inputs and output until execution time. Even then some blocks might simply ignore discrepancies without generating any errors! For example, a block like:
controller.add_filter('gain',
Gain(),
['input1'],
['output1','output2'])
is not only valid but also does not generate any runtime
error. However, only the output output1
contains a multiple
of output2
. Since output2
does not match any
input it is simply ignored. Likewise, in:
controller.add_filter('gain',
Gain(),
['input1','input2'],
['output1'])
only the input intput1
gets passed on to the output
output1
. Again, no runtime errors are ever generated.
Finally, the block:
controller.add_filter('gain',
Gain(gain = numpy.array([-1,2]), demux = True),
['input1'],
['output1','output2'])
leverages demultiplexing and the use of a numpy array as a gain to
produce a signal output1
which is input1
multiplied by -1 and a signal output2
which is
input1
multiplied by 2.
Client-Server Application Architecture¶
Since the beginnings of the development of this package one goal was
to be able to deploy and run controllers on embedded systems. With
that goal in mind we provide two special classes of controllers:
pyctrl.server.Controller
and
pyctrl.client.Controller
, and two scripts:
pyctrl_start_server
and pyctrl_stop_server
to start and
stop a controller server. Those scripts and classes can be combined to
run applications remotely.
Starting the server¶
Start by using the script pyctrl_start_server
to create a server
for you. In this tutorial, you will create a server on the same
machine you will be running the client. The process of initializing a
server on a remote machine is virtually identical. Type:
pyctrl_start_server
which start the server and produces the following output:
pyctrl_start_server (version 1.0)
> Options:
Hostname[port]: localhost[9999]
Sampling period: ---
Verbose level: 1
Type 'pyctrl_start_server -h' for more options
<class 'pyctrl.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[Clock, enabled] >> clock
> filters
> sinks
> Starting server... done
> Hit Ctrl-C or use 'pyctrl_stop_server' to exit the server
showing that a server has been started at the localhost
at
the port 9999
. Those are the default values for host and
port. It also shows that the server is running a controller which is
an instance of the basic pyctrl.Controller
class.
The attribute host
is qualified name or valid IP address of
the machine you’re connecting to and port
is the port you
would like to connect. The connection is established using a TCP
network socket. See
Options available with pyctrl_start_server for how to set these
options.
Connecting your client¶
Start a new console and a new python shell. Proceed as in Section Hello World! and create a controller:
from pyctrl.client import Controller
hello = Controller()
The only difference is that you imported Controller
from the
class pyctrl.client.Controller
, as opposed to from
pyctrl.Controller
. Once you have initialized a controller as
a client and you have a controller running as a server,the flow is
very much like before. For example, we can query the controller using
print(hello.info('all'))
, which in this case should reproduce
the exact same configuration of the controller running on the server:
pyctrl_start_server (version 1.0)
> Options:
Hostname[port]: localhost[9999]
Sampling period: ---
Verbose level: 1
<class 'pyctrl.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[Clock, enabled] >> clock
> filters
> sinks
Programming a client controller is, for the most part, just like
programming a local controller. However, there are some important caveats you should be aware of. For instance, attempting to install an instance pyctrl.block.clock.TimerClock
as a source by typing:
from pyctrl.block.clock import TimerClock
hello.add_source('myclock',
TimerClock(period = 1),
['myclock'])
will fail. The reason for the failure is because an instance of the
class pyctrl.block.clock.TimerClock
cannot be safely
transferred from your hardware to another, that is from the client
to the server. Here is where the notion of a device comes in
handy. Instead of instantiating
pyctrl.block.clock.TimerClock
on the client, you can use
pyctrl.Controller.add_device()
to have the remote controller
instantiate it directly on the server hardware! As in Section
Devices type:
hello.add_device('myclock',
'pyctrl.block.clock', 'TimerClock',
outputs = ['myclock'],
kwargs = {'period': 1},
enable = True)
to add a device pyctrl.block.clock.TimerClock
by letting
the remote server instantiate the object.
Alternatively, you can provide the optional parameters module and kwargs:
# initialize controller
hello = Controller(host = 'localhost', port = 9999,
module = 'pyctrl.timer',
kwargs = {'period': 1})
which will install the controller of class
pyctrl.timer.Controller
, which already contains a
pyctrl.block.clock.TimerClock
clock, directly on the
server.
From this point on, just proceed as in Hello World! to add a
pyctrl.block.Printer
:
from pyctrl.block import Printer
hello.add_sink('message',
Printer(message = 'Hello World!'),
['myclock'],
enable = True)
and run the controller:
import time
with hello:
# do nothing for 5 seconds
time.sleep(5)
If you can’t see anything happening for five seconds, look again. This
time not on the console running the client, but on the console
running the server. You should see the message Hello World!
printed there a couple of times. What you have accomplished is running
a task on the remote server controller by programming it on the client
controller. Effectively, and appart from some subtleties concerning
devices, the only difference was importing from
pyctrl.client.Controller
rather than from
pyctrl.Controller
.
What’s under the hood?¶
Before moving forward, a bit of a technical note. You might be
wondering why pyctrl.block.clock.TimerClock
could not be
added as a source but it is fine to add
pyctrl.block.Printer
as a sink. The difference has to do
with the ability of the client controller to transfer a block to the
remote controller. In order for that to process to happen, the block
pyctrl.block.Printer
has to be safely deconstructed, packed,
transmitted over the network socket, unpacked and then reconstructed
at the server controller.
As mentioned earlier, the transmission part is done using a TCP network socket. The packing and unpacking bit is done using a technique called serialization. We rely on Python’s pickle module to handle the dirtiest parts of this job. In a nutshell, if an object cannot be serialized by pickle, that is it cannot be pickled, then it cannot be installed remotely as a source, filter, sink, or timer. In this case, it needs to be installed as a device.
If you are curious why pyctrl.block.clock.TimerClock
could
not be serialized, it is because
pyctrl.block.clock.TimerClock
runs on a separate process
thread, and there is no way to simply transfer the thread information
over the network. As for pyctrl.block.Printer
, it is
possible to use its attributes to reconstruct it on the server
side.
Note that what decides if an object can be pickled or not is not its
base class but the contents of its attributes at the time one
attempts to pickle it. For example, a
pyctrl.block.Printer
in which you have setup the attribute
file
to redirect its output to a local file will fail to
install as a sink. You could instead install it as a device, but
in this case, the output would be redirected to a file that lives in
the remote server rather than the local client.
A final note about serialization and pickle is that this process is inherently unsafe from a security standpoint. Code that is embedded in a serialized object can be used to take control of or damage the server host by running malicious code. If security is a concern, it must be addressed at the network level, before a client is allowed to connect to a server, for example by setting up a firewall that restricts connection to a know number of potential client addresses combined with some strong form of authentication.
Options available with pyctrl_start_server
¶
Starting pyctrl_start_server
with the -h
flag displays
the options available:
usage: pyctrl_start_server [-h] [-m MODULE] [-c CONTROLLER] [-H HOST] [-p PORT]
[-v VERBOSE] [-t PERIOD]
pyctrl_start_server (version 1.0)
optional arguments:
-h, --help show this help message and exit (default: False)
-m MODULE, --module MODULE
controller module (default: pyctrl)
-c CONTROLLER, --controller CONTROLLER
controller class (default: Controller)
-H HOST, --host HOST host name or IP address (default: localhost)
-p PORT, --port PORT port number (default: 9999)
-v VERBOSE, --verbose VERBOSE
level of verbosity (default: 1)
-t PERIOD, --period PERIOD
sampling period in seconds (default: 0.01)
Besides getting help one can initialize a server with any arbitrary
controller using the -m
, --module
and
-c
, --controller
options as in:
pyctrl_start_server -m pyctrl.timer
which initializes the server controller to be an instance of
pyctrl.timer.Controller
instead of the default
pyctrl.Controller
.
Another useful pair of options is -H
, --host
and
-p
, --port
, which can be used to change the
current host name or IP address and port. For example:
pyctrl_start_server -m pyctrl.rc.mip -H 192.168.0.132 -p 9090
would initialize the server using an instance of
pyctrl.rc.mip.Controller
at the local network IP address
192.168.0.132 at the port 9090.
Finally -t
, --period
lets one set the controller
sampling period and -v
, --verbose
control how
much messages you would see coming out of the server
controller. Setting verbose to a number higher than 2 produces an
enormous amount of information that could be useful for debugging.
Out of all these options, -v
, --verbose
,
-H
, --host
and -p
, --port
,
are the ones that cannot be changed by a client connected to the
controller server after it’s been initialized.
Working with pyctrl.client.Controller
¶
As shown above, working with an instance of
pyctrl.client.Controller
is for the most part identical to
working with any other instance of pyctrl.Controller
. In
this section you will learn a couple of useful practices when using a
client controller.
A first issue is setting the client to talk to the server at the right
address and port. This can be done by initializing the client with
attributes host
and port
. For example:
from pyctrl.client import Controller
client = Controller(host = '192.168.0.132', port = 9090)
would connect the client to a local network server at address 192.168.0.132 and port 9090.
Once connected, an usual mistake is to make assumptions about the
current state of a server controller. Since another client could have
connected to the server earlier and changed settings in unpredictable
ways, it might be useful to call
pyctrl.client.Controller.reset()
to reset the remote
controller at the server before doing anything:
client.reset()
pyctrl.client.Controller.reset()
can also be used to install a
completely new controller on the server, as if using the -m
,
--module
and -c
, --controller
options
in pyctrl_start_server
. For example:
client.reset(module = 'pyctrl.timer')
install a new instance of pyctrl.timer.Controller
in the
remote server. You can query the server about its controller class by
using pyctrl.Controller.info()
as in:
client.info('class')
which should then return the string "<class
'pyctrl.timer.Controller'>"
. You can also pass arguments to the
controller constructor. For example:
client.reset(module = 'pyctrl.timer', kwargs = {'period': 0.1})
will install a new instance of pyctrl.timer.Controller
running at 10 Hz on the remote server.
For convenience, all these operations can be performed by the
pyctrl.client.Controller
constructor. For example:
from pyctrl.client import Controller
client = Controller(host = '192.168.0.132', port = 9090,
module = 'pyctrl.timer',
kwargs = {'period': 0.1})
initializes the client and resets the remote controller by installing
a new instance of pyctrl.timer.Controller
running at 10 Hz
on the remote server.
SSH and port forwarding¶
A common setup is that of a server running on an embedded system, such as a Beaglebone Black or a Raspberry Pi, controlled remotely by a computer. In most cases, connections to the server will be established using ssh.
The following is a typical session: the user on the client computer, user@client, establishes a connection to the remote server, in this case a Beaglebone Black as root@192.168.0.68, using ssh:
user@client:~$ ssh -L9999:localhost:9999 root@192.168.0.68
Debian GNU/Linux 8
BeagleBoard.org Debian Image 2016-11-06
Support/FAQ: http://elinux.org/Beagleboard:BeagleBoneBlack_Debian
default username:password is [debian:temppwd]
Last login: Sat Apr 1 17:06:08 2017 from 192.168.0.1
root@beaglebone:~#
The important detail here is the argument
-L9999:localhost:9999
, which tells ssh to establish a
tunnel, that is to forward the port 9999 from the server to the
client.
Because of that, the user can initiate the server using
localhost
as its host name:
root@beaglebone:~# pyctrl_start_server
pyctrl_start_server (version 1.0)
Type 'pyctrl_start_server -h' for more options
> Options:
Hostname[port]: localhost[9999]
Sampling period: ---
Verbose level: 1
<class 'pyctrl.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[Clock, enabled] >> clock
> filters
> sinks
> Starting server... done
> Hit Ctrl-C or use 'pyctrl_stop_server' to exit the server
After starting the server, on another terminal, the user runs his
application as a client, also connected to localhost
. For
example, using the interactive shell:
user@client:~$ python
Python 3.4.5 |Anaconda 2.3.0 (x86_64)| (default, Jul 2 2016, 17:47:57)
[GCC 4.2.1 Compatible Apple LLVM 4.2 (clang-425.0.28)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from pyctrl.client import Controller
>>> hello = Controller()
>>> print(hello.info('all'))
<class 'pyctrl.Controller'> with:
0 timer(s), 3 signal(s),
1 source(s), 0 filter(s), and 0 sink(s)
> timers
> signals
1. clock
2. duty
3. is_running
> sources
1. clock[Clock, enabled] >> clock
> filters
> sinks
>>> quit()
user@client:~$
In this way, all network traffic is managed by ssh. An additional
advantage is that this traffic is also encrypted and potentially
compressed, which adds
efficiency and security to the entire session. With ssh you can also
forward your X graphics terminal using the -X
flag. For
example:
ssh -X -L9999:localhost:9999 root@192.168.0.68
would forward both the 9999 port and your X terminal session.
Performance considerations¶
Error Handling¶
Since you’re using Python, error testing and handling can be kept at a
minimum. Thanks to duck typing and other Python
features, anything that could go wrong will be dealt with at execution
time, with errors being propagated using the standard Python
exception
mechanism. As discussed in Section What’s going on?, users can
handle errors by enclosing segments of code using the try
statement:
try:
# do something
...
except:
# do something if exception was raised
...
finally:
# always do this
...
Most of the time, error checking is limited to operations that could
invalidate your controller. In particular, very few error checking
tests are performed in the methods pyctrl.block.Block.read()
and pyctrl.block.Block.write()
. Such tests would be repeated in
the main controller loop and could potentially impact performance. If
you need to perform tests in those methods consider using assertions. This
means that those tests could be completely turned off if they are
impacting performance by invoking Python with the -O
flag.
Extending Controllers¶
One can take advantage of python’s object oriented features to extend
the functionality of the basic pyctrl.Controller
. All that
is necessary is to inherit from
pyctrl.Controller
.
Inheritance is an easy way to equip controllers with special hardware
capabilities. That was the case, for example, with the class
pyctrl.timer.Controller
described in Devices. In
fact, this new class is so simple that its entire code easily fits
here:
class Controller(pyctrl.Controller):
"""
:py:class:`pyctrl.timer.Controller` implements a controller
with a :py:class:`pyctrl.block.clock.TimerClock`.
The clock is enabled and disabled automatically when calling
`start()` and `stop()`.
:param period: the clock period (default 0.01)
"""
def __init__(self, **kwargs):
# period
self.period = kwargs.pop('period', 0.01)
# discard argument 'noclock'
kwargs.pop('noclock', None)
# Initialize controller
super().__init__(noclock = True, **kwargs)
def _reset(self):
# call super
super()._reset()
# add signal clock
self.add_signal('clock')
# add device clock
self.add_source('clock',
('pyctrl.block.clock', 'TimerClock'),
['clock'],
enable = True,
kwargs = {'period': self.period})
# reset clock
self.set_source('clock', reset=True)
Virtually all functionality is provided by the base class
pyctrl.Controller
. The only methods overloaded are
pyctrl.Controller.__init__()
and
pyctrl.Controller._reset()
.
The method pyctrl.timer.Controller.__init__()
is the standard
python constructor, which in this case parses the new attribute
period
before calling the base class
pyctrl.Controller.__init__()
using:
super().__init__(**kwargs)
Note that this is done using pop as in:
self.period = kwargs.pop('period', 0.01)
Using pop()
makes sure the keyword period
is
removed from the dictionary kwargs
. Any remaining keywords
need to be valid attributes of the base class or an exception will be
raised.
Most of the action is in the method
pyctrl.Controller._reset()
. In fact, a closer look at
pyctrl.block.container.Controller.__init__()
, which is the
method called by super().__init__(**kwargs)
in
pyctrl.Controller.__init__()
, reveals:
def __init__(self, **kwargs):
# set enabled as False by default
if 'enabled' not in kwargs:
kwargs['enabled'] = False
# call super
super().__init__(**kwargs)
# call _reset
self._reset()
where a call to the method pyctrl.Controller._reset()
can be
spotted after a couple of definitions.
If you overload pyctrl.Controller._reset()
make sure to call:
super()._reset()
before doing any other task. This will make sure that whatever tasks that need to be performed by the base class have already taken place and won’t undo any of your own initialization.
The method pyctrl.Controller._reset()
is also called by the
method pyctrl.Controller.reset()
. In fact, one rarely needs to
overload any method other than pyctrl.Controller.__init__()
and
pyctrl.Controller._reset()
.
A typical reason for extending pyctrl.Controller
is to
provide the user with a set of devices that continue to exist even
after a call to pyctrl.Controller.reset()
. For example, the
following code is from pyctrl.rc.mip.Controller()
:
class Controller(pyctrl.rc.Controller):
def _reset(self):
# call super
super()._reset()
self.add_signals('theta','theta_dot',
'encoder1','encoder2',
'pwm1','pwm2')
# add source: imu
self.add_source('inclinometer',
('pyctrl.rc.mpu9250', 'Inclinometer'),
['theta','theta_dot'])
# add source: encoder1
self.add_source('encoder1',
('pyctrl.rc.encoder', 'Encoder'),
['encoder1'],
kwargs = {'encoder': 3,
'ratio': 60 * 35.557})
# add source: encoder2
self.add_source('encoder2',
('pyctrl.rc.encoder', 'Encoder'),
['encoder2'],
kwargs = {'encoder': 2,
'ratio': - 60 * 35.557})
# add sink: motor1
self.add_sink('motor1',
('pyctrl.rc.motor', 'Motor'),
['pwm1'],
kwargs = {'motor': 3},
enable = True)
# add sink: motor2
self.add_sink('motor2',
('pyctrl.rc.motor', 'Motor'),
['pwm2'],
kwargs = {'motor': 2,
'ratio': -100},
enable = True)
which adds a number of devices to the base class
pyctrl.rc.Controller()
that can be used with the Robotics Cape
and the Educational MIP as described in Interfacing with hardware.
Writting your own Blocks¶
The package pyctrl
is designed so that you can easily extend
its functionality by writing simple python code for your own
blocks. You can write blocks to support your specific hardware or
implement an algorithm that is currently not available in
Module pyctrl.block.
Your blocks should inherit from pyctrl.block.Block
or one of
its derived class, such as pyctrl.block.BufferBlock
, which
are described next.
Extending pyctrl.block.Block
¶
A pyctrl.block.Block
needs to know how to do two things:
respond to calls to pyctrl.block.Block.read()
and/or
pyctrl.block.Block.write()
. If a block is to be used as a source
then it needs to respond to pyctrl.block.Block.read()
, if it is to be
used as a sink it needs to respond to pyctrl.block.Block.write()
,
and it if is to be used as a filter it needs to respond to both.
For example consider the following code for a simple block:
import pyctrl.block
class MyOneBlock(pyctrl.block.Source, pyctrl.block.Block):
def read(self):
return (1,)
Multiple inheritance is used to make sure this block can only be used
as a source by inheriting from pyctrl.block.Source
and
pyctrl.block.Block
. The order is important! If you try to
use MyOneBlock
as a sink or a filter an exception will
be raised since MyOneBlock
does not overload
pyctrl.block.Block.write()
. Likewise, sinks must inherit
from pyctrl.block.Sink
and filters from
pyctrl.block.Filter
.
All this block does is output a signal which is the constant 1.
Note that the return value of pyctrl.block.Block.read()
must
be a tuple with numbers or numpy 1D-arrays. You could use your block
in a controller like this:
# add a MyOneBlock as a source
controller.add_source('mysource',
MyOneBlock(),
['signal'])
which would write 1 to the signal signal
every time the
controller loop is run.
Consider now the slightest more sophisticated block:
import pyctrl.block
class MySumBlock(pyctrl.block.Filter, pyctrl.block.Block):
def __init__(self, **kwargs):
# you must call super().__init__
super().__init__(**kwargs)
# create local buffer
self.buffer = ()
def write(self, *values):
# copy values to buffer
self.buffer = values
def read(self):
# return sum of all values as first entry
return (sum(self.buffer), )
Because MySumBlock
inherits from
pyctrl.block.Filter
if can be be used a filter. It must
therefore overload both pyctrl.block.Block.write()
and
pyctrl.block.Block.read()
. For instance:
# add a MySumBlock as a filter
controller.add_filter('myfilter',
MySumBlock(),
['signal1','signal2','signal3'],
['sum'])
would set the signal sum
to be equal to the sum of the
three input signals signal1
, signal2
, and
signal3
. When placed in a controller loop, the loop will
first call MySumBlock.write()
then MySumBlock.read()
as if running a code similar to the following:
myfilter.write(signal1, signal2, signal3)
(sum, ) = myfilter.read()
At the end of a loop iteration the variable sum
would
contain the sum of the three variables signal1
,
signal2
, and signal3
. Of course the code run by
pyctrl.Controller
is never explicitly expanded as above.
A couple of important details here. First
MySumBlock.__init__()
calls
pyctrl.block.Block.__init__()
then proceeds to create its own
attribute buffer
. Note that pyctrl.block.Block()
does
not accept positional arguments, only keyword arguments. As you will
learn soon, this facilitates handling errors in the
constructor. Second the method MySumBlock.write()
should
always take a variable number of arguments, represented by the python
construction *values
. Inside MySumBlock.write()
the
variable values
is a tuple. Third,
because pyctrl.block.Block.write()
and
pyctrl.block.Block.read()
are called separately, it is often
the case that one needs an internal variable to store values to be
carried from pyctrl.block.Block.write()
to
pyctrl.block.Block.read()
. This is so common that
pyctrl.block
provides a specialized class
pyctrl.block.BufferBlock
, which you will learn about in the
next section.
Extending pyctrl.block.BufferBlock
¶
The class pyctrl.block.BufferBlock
has several features that
can facilitate the implementation of blocks. First,
py:meth:pyctrl.block.BufferBlock.read and
pyctrl.block.BufferBlock.write()
work with a an internal
attribute buffer
, which can be used to carry values from
pyctrl.block.BufferBlock.write()
to
pyctrl.block.BufferBlock.read()
. Second, it support
multiplexing and demultiplexing of inputs as discussed in Section
Multiplexing and demultiplexing.
Consider as an example the block pyctrl.block.system.Gain
,
which produces an output which correspond to its inputs multiplied by
a fixed gain. Because pyctrl.block.system.Gain
does
nothing to its inputs when it is written it does not overload
pyctrl.block.BufferBlock.write()
. All the action is on the
method pyctrl.block.system.Gain.write()
:
def write(self, *values):
"""
Writes product of :py:attr:`gain` times current input to the
private :py:attr:`buffer`.
:param vararg values: values
"""
# call super
super().write(*values)
self.buffer = tuple(v * self.gain for v in self.buffer)
Note that it starts by calling super().write(*values)
, which
will take care of any multiplexing at the input, followed by the
actual calculation, which in this case is performed using a list
comprehension:
self.buffer = tuple(v * self.gain for v in self.buffer)
that produces the desired output tuple.
For another example consider the block
pyctrl.block.system.Sum
and its method
pyctrl.block.system.Sum.write()
:
def write(self, *values):
"""
Writes product of `gain` times the sum of the current input to the private `buffer`.
:param vararg values: list of values
:return: tuple with scaled input
"""
# call super
super(Gain, self).write(*values)
self.buffer = (self.gain * numpy.sum(self.buffer, axis=0), )
The only new detail here is the use of super(Gain,
self).write(*values)
. This is because
pyctrl.block.system.Sum
inherits from
pyctrl.block.system.Gain
, and you would like to call
pyctrl.block.BufferBlock.write()
instead of
pyctrl.block.system.Gain.write()
.
Examples¶
The following examples can be found in the directory
examples
.
hello_world.py¶
def main():
# import python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl import Controller
from pyctrl.block import Printer
from pyctrl.block.clock import TimerClock
# initialize controller
hello = Controller()
# add the signal myclock
hello.add_signal('myclock')
# add a TimerClock as a source
hello.add_source('myclock',
TimerClock(period = 1),
['myclock'],
enable = True)
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Hello World!'),
['myclock'],
enable = True)
try:
# run the controller
with hello:
# do nothing for 5 seconds
time.sleep(5)
except KeyboardInterrupt:
pass
finally:
print('Done')
hello_timer_1.py¶
def main():
# import python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Printer
# initialize controller
hello = Controller(period = 1)
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Hello World @ {:3.1f} s'),
['clock'],
enable = True)
# print controller info
print(hello.info('all'))
try:
# run the controller
print('> Run the controller.')
print('> Do nothing for 5 s with the controller on...')
with hello:
# do nothing for 5 seconds
time.sleep(5)
print('> Do nothing for 2 s with the controller off...')
time.sleep(2)
print('> Do nothing for 5 s with the controller on...')
with hello:
# do nothing for 5 seconds
time.sleep(5)
print('> Done with the controller.')
except KeyboardInterrupt:
pass
hello_timer_2.py¶
def main():
# import python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl import Controller
from pyctrl.block import Printer, Constant
# initialize controller
hello = Controller()
# add a Printer as a timer
hello.add_timer('message',
Printer(message = 'Hello World @ {:3.1f} s '),
['clock'], None,
period = 1, repeat = True)
# Add a timer to stop the controller
hello.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 5, repeat = False)
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Current time {:5.3f} s', endln = '\r'),
['clock'])
# print controller info
print(hello.info('all'))
try:
# run the controller
print('> Run the controller.')
with hello:
# wait for the controller to finish on its own
hello.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
hello_filter_1.py¶
def main():
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Constant, Printer
# initialize controller
Ts = 0.1
hello = Controller(period = Ts)
# add pwm signal
hello.add_signal('pwm')
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add filter to interpolate data
hello.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# add logger
hello.add_sink('printer',
Printer(message = 'time = {:3.1f} s, motor = {:+6.1f} %',
endln = '\r'),
['clock','pwm'])
# Add a timer to stop the controller
hello.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(hello.info('all'))
try:
# run the controller
print('> Run the controller.')
with hello:
# wait for the controller to finish on its own
hello.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
hello_filter_2.py¶
def main():
import sys
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Printer, Constant, Logger
# initialize controller
Ts = 0.01
hello = Controller(period = Ts)
# add pwm signal
hello.add_signal('pwm')
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add filter to interpolate data
hello.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# add logger
hello.add_sink('printer',
Printer(message = 'time = {:3.1f} s, motor = {:+6.1f} %',
endln = '\r'),
['clock','pwm'])
# add logger
hello.add_sink('logger',
Logger(),
['clock','pwm'])
# Add a timer to stop the controller
hello.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(hello.info('all'))
try:
# run the controller
print('> Run the controller.')
with hello:
# wait for the controller to finish on its own
hello.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
# retrieve data from logger
data = hello.get_sink('logger', 'log')
try:
# import matplotlib
import matplotlib.pyplot as plt
except:
print('! Could not load matplotlib, skipping plots')
sys.exit(0)
print('> Will plot')
try:
# start plot
plt.figure()
except:
print('! Could not plot graphics')
print('> Make sure you have a connection to a windows manager')
sys.exit(0)
# plot input
plt.plot(data['clock'], data['motor'], 'b')
plt.ylabel('pwm (%)')
plt.xlabel('time (s)')
plt.ylim((-120,120))
plt.xlim(0,6)
plt.grid()
# show plots
plt.show()
hello_client.py¶
def main():
# import python's standard time module
import time
# import Controller and other blocks from modules
from pyctrl.client import Controller
from pyctrl.block import Printer
# initialize controller
hello = Controller(host = 'localhost', port = 9999,
module = 'pyctrl.timer',
kwargs = {'period': 1})
# add a Printer as a sink
hello.add_sink('message',
Printer(message = 'Hello World @ {:4.2f}s'),
['clock'],
enable = True)
# print controller information
print(hello.info('all'))
try:
# run the controller
with hello:
# do nothing for 5 seconds
time.sleep(5)
except KeyboardInterrupt:
pass
simulated_motor_1.py¶
def main():
# import python's standard math module and numpy
import math, numpy, sys
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Logger, Constant
from pyctrl.block.system import System
from pyctrl.system.tf import DTTF
# initialize controller
Ts = 0.01
simotor = Controller(period = Ts)
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add pwm signal
simotor.add_signal('pwm')
# add filter to interpolate data
simotor.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# Motor model parameters
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
c = math.exp(-Ts/tau)
d = (g*Ts)*(1-c)/2
# add motor signals
simotor.add_signal('encoder')
# add motor filter
simotor.add_filter('motor',
System(model = DTTF(
numpy.array((0, d, d)),
numpy.array((1, -(1 + c), c)))),
['pwm'],
['encoder'])
# add logger
simotor.add_sink('logger',
Logger(),
['clock','pwm','encoder'])
# Add a timer to stop the controller
simotor.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(simotor.info('all'))
try:
# run the controller
print('> Run the controller.')
with simotor:
# wait for the controller to finish on its own
simotor.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
finally:
pass
# read logger
data = simotor.get_sink('logger', 'log')
try:
# import matplotlib
import matplotlib.pyplot as plt
except:
print('! Could not load matplotlib, skipping plots')
sys.exit(0)
print('> Will plot')
try:
# start plot
plt.figure()
except:
print('! Could not plot graphics')
print('> Make sure you have a connection to a windows manager')
sys.exit(0)
# plot pwm
plt.subplot(2,1,1)
plt.plot(data['clock'], data['pwm'], 'b')
plt.ylabel('pwm (%)')
plt.ylim((-120,120))
plt.xlim(0,6)
plt.grid()
# plot encoder
plt.subplot(2,1,2)
plt.plot(data['clock'], data['encoder'],'b')
plt.ylabel('encoder (cycles)')
plt.ylim((0,25))
plt.xlim(0,6)
plt.grid()
# show plots
plt.show()
simulated_motor_2.py¶
def main():
# import python's standard math module and numpy
import math, numpy, sys
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Logger, Constant
from pyctrl.block.system import System, Differentiator
from pyctrl.system.tf import DTTF, LPF
# initialize controller
Ts = 0.01
simotor = Controller(period = Ts)
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add pwm signal
simotor.add_signal('pwm')
# add filter to interpolate data
simotor.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# Motor model parameters
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
c = math.exp(-Ts/tau)
d = (g*Ts)*(1-c)/2
# add motor signals
simotor.add_signal('encoder')
# add motor filter
simotor.add_filter('motor',
System(model = DTTF(
numpy.array((0, d, d)),
numpy.array((1, -(1 + c), c)))),
['pwm'],
['encoder'])
# add motor speed signal
simotor.add_signal('speed')
# add motor speed filter
simotor.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
# add low-pass signal
simotor.add_signal('fspeed')
# add low-pass filter
simotor.add_filter('LPF',
System(model = LPF(fc = 5, period = Ts)),
['speed'],
['fspeed'])
# add logger
simotor.add_sink('logger',
Logger(),
['clock','pwm','encoder','speed','fspeed'])
# Add a timer to stop the controller
simotor.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(simotor.info('all'))
try:
# run the controller
print('> Run the controller.')
with simotor:
# wait for the controller to finish on its own
simotor.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
finally:
pass
# read logger
data = simotor.get_sink('logger', 'log')
try:
# import matplotlib
import matplotlib.pyplot as plt
except:
print('! Could not load matplotlib, skipping plots')
sys.exit(0)
print('> Will plot')
try:
# start plot
plt.figure()
except:
print('! Could not plot graphics')
print('> Make sure you have a connection to a windows manager')
sys.exit(0)
# plot pwm
ax1 = plt.gca()
ax1.plot(data['clock'], data['pwm'],'g', label='pwm')
ax1.set_ylabel('pwm (%)')
ax1.set_ylim((-60,120))
ax1.grid()
plt.legend(loc = 2)
# plot velocity
ax2 = plt.twinx()
ax2.plot(data['clock'], data['speed'],'b', label='speed')
ax2.plot(data['clock'], data['fspeed'], 'r', label='fspeed')
ax2.set_ylabel('speed (Hz)')
ax2.set_ylim((-6,12))
ax2.set_xlim(0,6)
ax2.grid()
plt.legend(loc = 1)
# show plots
plt.show()
rc_motor.py¶
def main():
# import python's standard math module and numpy
import math, numpy, sys
# import Controller and other blocks from modules
from pyctrl.timer import Controller
from pyctrl.block import Interp, Logger, Constant
from pyctrl.block.system import System, Differentiator
from pyctrl.system.tf import DTTF, LPF
# initialize controller
Ts = 0.01
simotor = Controller(period = Ts)
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 100, 100, -50, -50, 0, 0]
# add pwm signal
simotor.add_signal('pwm')
# add filter to interpolate data
simotor.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['pwm'])
# Motor model parameters
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
c = math.exp(-Ts/tau)
d = (g*Ts)*(1-c)/2
# add motor signals
simotor.add_signal('encoder')
# add motor filter
simotor.add_filter('motor',
System(model = DTTF(
numpy.array((0, d, d)),
numpy.array((1, -(1 + c), c)))),
['pwm'],
['encoder'])
# add motor speed signal
simotor.add_signal('speed')
# add motor speed filter
simotor.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
# add low-pass signal
simotor.add_signal('fspeed')
# add low-pass filter
simotor.add_filter('LPF',
System(model = LPF(fc = 5, period = Ts)),
['speed'],
['fspeed'])
# add logger
simotor.add_sink('logger',
Logger(),
['clock','pwm','encoder','speed','fspeed'])
# Add a timer to stop the controller
simotor.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(simotor.info('all'))
try:
# run the controller
print('> Run the controller.')
with simotor:
# wait for the controller to finish on its own
simotor.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
finally:
pass
# read logger
data = simotor.get_sink('logger', 'log')
try:
# import matplotlib
import matplotlib.pyplot as plt
except:
print('! Could not load matplotlib, skipping plots')
sys.exit(0)
print('> Will plot')
try:
# start plot
plt.figure()
except:
print('! Could not plot graphics')
print('> Make sure you have a connection to a windows manager')
sys.exit(0)
# plot pwm
ax1 = plt.gca()
ax1.plot(data['clock'], data['pwm'],'g', label='pwm')
ax1.set_ylabel('pwm (%)')
ax1.set_ylim((-60,120))
ax1.grid()
plt.legend(loc = 2)
# plot velocity
ax2 = plt.twinx()
ax2.plot(data['clock'], data['speed'],'b', label='speed')
ax2.plot(data['clock'], data['fspeed'], 'r', label='fspeed')
ax2.set_ylabel('speed (Hz)')
ax2.set_ylim((-6,12))
ax2.set_xlim(0,6)
ax2.grid()
plt.legend(loc = 1)
# show plots
plt.show()
rc_motor_control.py¶
def main():
# import python's standard math module and numpy
import math, numpy, sys
# import Controller and other blocks from modules
from pyctrl.rc import Controller
from pyctrl.block import Interp, Logger, Constant
from pyctrl.block.system import System, Differentiator, Feedback
from pyctrl.system.tf import PID
# initialize controller
Ts = 0.01
bbb = Controller(period = Ts)
# add encoder as source
bbb.add_source('encoder1',
('pyctrl.rc.encoder', 'Encoder'),
['encoder'],
kwargs = {'encoder': 3,
'ratio': 60 * 35.557})
# add motor as sink
bbb.add_sink('motor1',
('pyctrl.rc.motor', 'Motor'),
['pwm'],
kwargs = {'motor': 3},
enable = True)
# add motor speed signal
bbb.add_signal('speed')
# add motor speed filter
bbb.add_filter('speed',
Differentiator(),
['clock','encoder'],
['speed'])
# calculate PI controller gains
tau = 1/55 # time constant (s)
g = 0.092 # gain (cycles/sec duty)
Kp = 1/g
Ki = Kp/tau
print('Controller gains: Kp = {}, Ki = {}'.format(Kp, Ki))
# build controller block
pid = System(model = PID(Kp = Kp, Ki = Ki, period = Ts))
# add motor speed signal
bbb.add_signal('speed_reference')
bbb.add_filter('PIcontrol',
Feedback(block = pid),
['speed','speed_reference'],
['pwm'])
# build interpolated input signal
ts = [0, 1, 2, 3, 4, 5, 5, 6]
us = [0, 0, 8, 8, -4, -4, 0, 0]
# add filter to interpolate data
bbb.add_filter('input',
Interp(xp = us, fp = ts),
['clock'],
['speed_reference'])
# add logger
bbb.add_sink('logger',
Logger(),
['clock','pwm','encoder','speed','speed_reference'])
# Add a timer to stop the controller
bbb.add_timer('stop',
Constant(value = 0),
None, ['is_running'],
period = 6, repeat = False)
# print controller info
print(bbb.info('all'))
try:
# run the controller
print('> Run the controller.')
# set speed_reference
#bbb.set_signal('speed_reference', 5)
# reset clock
bbb.set_source('clock', reset = True)
with bbb:
# wait for the controller to finish on its own
bbb.join()
print('> Done with the controller.')
except KeyboardInterrupt:
pass
finally:
pass
# read logger
data = bbb.get_sink('logger', 'log')
try:
# import matplotlib
import matplotlib.pyplot as plt
except:
print('! Could not load matplotlib, skipping plots')
sys.exit(0)
print('> Will plot')
try:
# start plot
plt.figure()
except:
print('! Could not plot graphics')
print('> Make sure you have a connection to a windows manager')
sys.exit(0)
# plot pwm
plt.subplot(2,1,1)
plt.plot(data['clock'], data['pwm'], 'b')
plt.ylabel('pwm (%)')
plt.ylim((-120,120))
plt.xlim(0,6)
plt.grid()
# plot encoder
plt.subplot(2,1,2)
plt.plot(data['clock'], data['encoder'],'b')
plt.ylabel('position (cycles)')
plt.ylim((0,25))
plt.xlim(0,6)
plt.grid()
# start plot
plt.figure()
# plot pwm
ax1 = plt.gca()
ax1.plot(data['clock'], data['pwm'],'g', label='pwm')
ax1.set_ylabel('pwm (%)')
ax1.set_ylim((-60,120))
ax1.grid()
plt.legend(loc = 2)
# plot velocity
ax2 = plt.twinx()
ax2.plot(data['clock'], data['speed'],'b', label='speed')
ax2.plot(data['clock'], data['speed_reference'], 'r', label='reference')
ax2.set_ylabel('speed (Hz)')
ax2.set_ylim((-6,12))
ax2.set_xlim(0,6)
ax2.grid()
plt.legend(loc = 1)
# show plots
plt.show()
rc_mip_balance.py¶
def main():
# import blocks and controller
from pyctrl.rc.mip import Controller
from pyctrl.block.container import Container, Input, Output
from pyctrl.block.system import System, Subtract, Differentiator, Sum, Gain
from pyctrl.block.nl import ControlledCombination, Product
from pyctrl.block import Fade, Printer
from pyctrl.system.ss import DTSS
from pyctrl.block.logic import CompareAbsWithHysterisis, SetFilter, State
from rcpy.gpio import GRN_LED, PAUSE_BTN
from rcpy.led import red
# export json?
export_json = False
# create mip
mip = Controller()
# phi is the average of the encoders
mip.add_signal('phi')
mip.add_filter('phi',
Sum(gain=0.5),
['encoder1','encoder2'],
['phi'])
# phi dot
mip.add_signal('phi_dot')
mip.add_filter('phi_dot',
Differentiator(),
['clock','phi'],
['phi_dot'])
# phi dot and steer reference
mip.add_signals('phi_dot_reference', 'phi_dot_reference_fade')
mip.add_signals('steer_reference', 'steer_reference_fade')
# add fade in filter
mip.add_filter('fade',
Fade(target = [0, 0.5], period = 5),
['clock','phi_dot_reference','steer_reference'],
['phi_dot_reference_fade','steer_reference_fade'])
# state-space matrices
A = np.array([[0.913134, 0.0363383],[-0.0692862, 0.994003]])
B = np.array([[0.00284353, -0.000539063], [0.00162443, -0.00128745]])
C = np.array([[-383.009, 303.07]])
D = np.array([[-1.22015, 0]])
B = 2*np.pi*(100/7.4)*np.hstack((-B, B[:,1:]))
D = 2*np.pi*(100/7.4)*np.hstack((-D, D[:,1:]))
ssctrl = DTSS(A,B,C,D)
# state-space controller
mip.add_signals('pwm')
mip.add_filter('controller',
System(model = ssctrl),
['theta_dot','phi_dot','phi_dot_reference_fade'],
['pwm'])
# enable pwm only if about small_angle
mip.add_signals('small_angle', 'small_angle_pwm')
mip.add_filter('small_angle_pwm',
Product(),
['small_angle', 'pwm'],
['small_angle_pwm'])
# steering biasing
mip.add_filter('steer',
ControlledCombination(),
['steer_reference_fade',
'small_angle_pwm','small_angle_pwm'],
['pwm1','pwm2'])
# set references
mip.set_signal('phi_dot_reference',0)
mip.set_signal('steer_reference',0.5)
# add supervisor actions on a timer
# actions are inside a container so that they are executed all at once
mip.add_timer('supervisor',
Container(),
['theta'],
['small_angle','is_running'],
period = 0.5, repeat = True)
mip.add_signals('timer/supervisor/theta',
'timer/supervisor/small_angle')
mip.add_source('timer/supervisor/theta',
Input(),
['theta'])
mip.add_sink('timer/supervisor/small_angle',
Output(),
['small_angle'])
mip.add_sink('timer/supervisor/is_running',
Output(),
['is_running'])
# add small angle sensor
mip.add_filter('timer/supervisor/is_angle_small',
CompareAbsWithHysterisis(threshold = 0.11,
hysterisis = 0.09,
offset = -0.07,
state = (State.LOW,)),
['theta'],
['small_angle'])
# reset controller and fade
mip.add_sink('timer/supervisor/reset_controller',
SetFilter(label = ['/controller','/fade'],
on_rise = {'reset': True}),
['small_angle'])
# add green led
mip.add_sink('timer/supervisor/green_led',
('pyctrl.rc.led', 'LED'),
['small_angle'],
kwargs = {'pin': GRN_LED},
enable = True)
# add pause button on a timer
mip.add_source('timer/supervisor/pause_button',
('pyctrl.rc.button', 'Button'),
['is_running'],
kwargs = {'pin': PAUSE_BTN,
'invert': True},
enable = True)
# print controller
print(mip.info('all'))
# export json?
if export_json:
from pyctrl.flask import JSONEncoder
# export controller as json
json = JSONEncoder(sort_keys = True, indent = 4).encode(mip)
with open('rc_mip_balance.json', 'w') as f:
f.write(json)
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
print("""
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* M I P B A L A N C E *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
""")
print("""
Hold your MIP upright to start balancing
Use your keyboard to control the mip:
* UP and DOWN arrows move forward and back
* LEFT and RIGHT arrows steer
* / stops forward motion
* . stops steering
* SPACE resets forward motion and steering
""")
# reset everything
mip.set_source('clock',reset=True)
mip.set_source('encoder1',reset=True)
mip.set_source('encoder2',reset=True)
mip.set_filter('controller',reset=True)
mip.set_source('inclinometer',reset=True)
# turn on red led
red.on()
# start the controller
mip.start()
print("Press Ctrl-C or press the <PAUSE> button to exit")
# fire thread to update velocities
thread = threading.Thread(target = get_arrows,
args = (mip, fd))
thread.daemon = False
thread.start()
# and wait until controller dies
mip.join()
# print message
print("\nDone with balancing")
except KeyboardInterrupt:
print("\nBalancing aborted")
finally:
# turn off red led
red.off()
# make sure it exits
mip.set_state(pyctrl.EXITING)
print("Press any key to exit")
thread.join()
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
References¶
[deO16] | (1, 2, 3) M. C. de Oliveira, Fundamentals of Linear Control: a concise approach, Cambridge University Press, 2016. |
[Zhuo16] | (1, 2) Zhu Zhuo, LQG Controller Design of the Mobile Inverted Pendulum, M.Sc. Thesis, Department of Mechanical and Aerospace Engineering, University of California San Diego, 2016. |