Raspberry Pi Unicorn Hat Tutorial | BlogJawn

Posted by William 'jawn-smith' Wilson Dave 'waveform' Jones on Fri 24 June 2022
An Ubuntu logo displayed on a Unicorn Hat HD.

As of Ubuntu 22.04, the Pimoroni Unicorn hats are supported on Ubuntu out of the box. This includes the standard Unicorn Hat, Unicorn pHat, Unicorn Hat Mini, and Unicorn Hat HD.

Note

To install the libraries to use these hats run the following commands:

Unicorn Hat and Unicorn pHat: sudo apt install python3-unicornhat

Unicorn Hat HD: sudo apt install python3-unicornhathd

Unicorn Hat Mini: sudo apt install python3-unicornhatmini

Below are some examples of how to use the various hats!

Unicorn pHat

Note

sudo is required for all pHat scripts

The pHat is the perfect size to use on a Raspberry Pi Zero 2 uses the same library as the Unicorn Hat.

The following script will display the Ukrainian flag:

import unicornhat as uh

uh.set_layout(uh.AUTO)
uh.brightness(0.5)

width,height=uh.get_shape()

for y in range(height):
    for x in range(width):
        if x < 2:
            uh.set_pixel(x, y, 0, 87, 183)
        else:
            uh.set_pixel(x, y, 255, 221, 0)

uh.show()
while True:
    pass
A Unicorn pHat mounted to a Raspberry Pi Zero displaying the colors of the Ukrainian flag

This next example periodically checks my internet speed and uses the pHat to indicate if the speed is good or bad using color as an indicator. It requires the speedtest python module, which isn't packaged natively in Ubuntu. To install it, run sudo pip3 install speedtest-cli.

import unicornhat as uh
import speedtest
import time

st = speedtest.Speedtest()
uh.set_layout(uh.AUTO)
uh.rotation(0)
uh.brightness(0.5)
width,height=uh.get_shape()

while True:
    # run a speed test for download speed
    dl = st.download()

    # run a speed test for upload speed
    ul = st.upload()

    # Set the Unicorn pHat LEDs accordingly
    if dl > 30000000: # 30 Mb/s
        # set the LEDs to green!
        dleds = (0, 255, 0)
    elif dl > 15000000: # 15 Mb/s
        # set the LEDs to yellow
        dleds = (255, 255, 0)
    else: # below 15 Mb/s
        # set the LEDs to red
        dleds = (255, 0, 0)

    if ul > 30000000: # 30 Mb/s
        # set the LEDs to green!
        uleds = (0, 255, 0)
    elif ul > 15000000: # 15 Mb/s
        # set the LEDs to yellow
        uleds = (255, 255, 0)
    else: # below 15 Mb/s
        # set the LEDs to red
        uleds = (255, 0, 0)

    for y in range(height):
        for x in range(width):
            if x < 2:
                uh.set_pixel(x,y,uleds)
            else:
                uh.set_pixel(x,y,dleds)

    uh.show()

    # sleep 10 minutes
    time.sleep(600)
A Unicorn pHat mounted to a Raspberry Pi Zero displaying red and green sections corresponding to upload and download speeds from a recent speed test

As you can see in the image above, my download speed was very good but my upload speed was not. Pimoroni has many more examples in their Unicorn Hat GitHub repository.

Unicorn Hat

Section written by Dave Jones

Note

sudo is required for all Unicorn Hat scripts

Anybody that's been on a video call with me has generally noticed some neopixely thingy pulsing away quietly behind me. This is the Sense HAT based piwheels monitor that lives on my desk (and occasionally travels with me).

Dave's piwheels monitor; a Sense HAT mounted on a PiJuice on a Raspberry Pi 3A+ in a rainbow colored PiBow coupé case. The device is sat just behind a soldering iron on a blue-painted worktop. Several of the Sense HAT's neo-pixels are lit in varying colors which, presumably, mean something to Dave.

For those unfamiliar with piwheels, the piwheels project is designed to automate building of wheels from packages on PyPI for a set of pre-configured ABIs. In English, this means that piwheels contains pre-built packages rather than source packages that need to be built locally as part of the installation. Piwheels currently only builds wheels for RaspiOS. We are currently exploring Ubuntu support in piwheels as well.

Why?!

While a monitor comprised of 64 colored dots may seem … minimal bordering on useless, I've found it quite the opposite for several reasons:

  • It's always visible on my desk; it's always running (even when I've rebooted to Windows for some gaming), it's never in a background window, it's not an email alert that gets lost in spam, or a text message that I don't notice because my phone's on silent.
  • It's only visible on my desk; piwheels is a volunteer project, so I'm happy to keep things running when I'm at my desk but if the builders go down at 4 in the morning: I don't care. I'll deal with that when I'm suitably caffeinated and sat at my computer.
  • It's a constant view of the overall system; I can rig up alerts to fire when certain things fail or occur, but it's also useful to have an "at a glance" view of the "health" of the overall system. I've occasionally caught issues in piwheels for which no specific alert existed because the monitor "looked off".

How?!

If anyone wants to follow in my pioneering lo-fi monitoring footsteps, here's a little script to achieve something similar with a Unicorn HAT. We'll go through it piece by piece:

#!/usr/bin/python3

import ssl
import math
import subprocess as sp
from pathlib import Path
from itertools import cycle
from time import sleep, time
from threading import Thread, Event
from urllib.request import urlopen, Request

import unicornhat

We start off with all the imports we'll need. Nothing terribly remarkable here other than to note the only external dependency is the Unicorn HAT library. Next onto the main monitor function:

def monitor(layout):
    unicornhat.set_layout(unicornhat.AUTO)
    unicornhat.rotation(0)
    unicornhat.brightness(1.0)

    width, height = unicornhat.get_shape()
    assert len(layout) <= height
    assert all(len(row) <= width for row in layout)

    pulse = cycle(math.sin(math.pi * i / 30) for i in range(30))
    updates = UpdateThread(layout)
    updates.start()
    try:
        for p in pulse:
            colors = {
                None: (0, 0, 0),
                True: (0, 127, 0),
                False: (int(255 * p), 0, 0),
            }
            for y, row in enumerate(layout):
                for x, check in enumerate(row):
                    value = check.value if isinstance(check, Check) else check
                    unicornhat.set_pixel(x, y, colors.get(value, value))
            unicornhat.show()
            sleep(1/30)
    finally:
        unicornhat.clear()
        updates.stop()
        updates.join()

This accepts a single parameter, layout which is a list of lists of checks. Each check corresponds to a single pixel on the HAT, so you can't define more than 8 per row, and no more than 64 in total.

The function sets up:

  • unicornhat -- the Unicorn HAT, including rotation and brightness, and asserting that the layout will fit the "shape" of the HAT
  • pulse -- an infinite cycle of numbers derived from the first half of a sine wave which we'll use to pulse the "failure" color nicely so it'll draw some attention to itself
  • updates -- some sort of UpdateThread which will be used to run the checks in the background so long running checks won't get in way of us pulsing things smoothly

Then it goes into an infinite loop (for p in pulse -- remember that pulse is an infinite generator) constantly updating the HAT with the values of each check.

Note

You may note that check.value is only used when our check is actually a Check, and further that if the value isn't found in the colors lookup table we just use the value directly. This allows us to specify literal False, True, or None values instead of checks (in case we want to space things out a bit), or have checks directly return color tuples instead of bools.

Now an important question: what is a check? Let's define some:

def page(url, *, timeout=10, status=200, method='HEAD', **kwargs):
    context = ssl.create_default_context()
    req = Request(url, method=method, **kwargs)
    try:
        print(f'Requesting {url}')
        with urlopen(req, timeout=timeout, context=context) as resp:
            return resp.status == status
    except OSError:
        return False

def cmd(cmdline, shell=True):
    try:
        print(f'Running {cmdline}')
        sp.check_call(
            cmdline, stdout=sp.DEVNULL, stderr=sp.DEVNULL, shell=shell)
    except sp.CalledProcessError:
        return False
    else:
        return True

def file(filename, min_size=1):
    try:
        print(f'Checking {filename}')
        return Path(filename).stat().st_size > min_size
    except OSError:
        return False

We define three check functions:

  • page -- checks that accessing a particular url (with the "HEAD" method by default) returns status code 200 (OK in HTTP parlance)
  • cmd -- checks that executing a particular shell command is successful (exits with code 0)
  • file -- checks that the specified file exists and has a particular minimum size (defaults to 1 so this effectively checks the file is not empty)

Next we define a class which we'll use to define individual checks. It will wrap one of the functions above, the parameters we want to pass to it, and how long we should cache results for before allowing the check to be run again:

class Check:
    def __init__(self, func, *args, every=60, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.every = every
        self.last_run = None
        self.value = None

    def update(self):
        now = time()
        if self.last_run is None or self.last_run + self.every < now:
            self.last_run = now
            self.value = self.func(*self.args, **self.kwargs)

Next, we need the background thread that will loop round running the update method of all the checks in the layout:

class UpdateThread(Thread):
    def __init__(self, layout):
        super().__init__(target=self.update, args=(layout,), daemon=True)
        self._done = Event()

    def stop(self):
        self._done.set()

    def update(self, layout):
        while not self._done.wait(1):
            for row in layout:
                for check in row:
                    if isinstance(check, Check):
                        check.update()

Finally, we need run the main monitor function and define all the checks we want to execute. I've included some examples which check some common / important pages on the piwheels site, some pages on my blog server, some basic connectivity check (can ping the local gateway, can ping a DNS name, can ping Google's DNS), and some example file checks.

if __name__ == '__main__':
    monitor([
        [ # some connectivity tests, centered
            None,
            None,
            None,
            Check(cmd, 'ping -c 1 -W 1 192.168.0.1', every=5),
            Check(cmd, 'ping -c 1 -W 1 8.8.8.8', every=30),
            Check(cmd, 'ping -c 1 -W 1 ubuntu.com', every=30),
        ],
        [ # a blank row
        ],
        [ # check some piwheels pages
            Check(page, 'https://www.piwheels.org/'),
            Check(page, 'https://www.piwheels.org/packages.html'),
            Check(page, 'https://www.piwheels.org/simple/index.html'),
            Check(page, 'https://www.piwheels.org/simple/numpy/index.html'),
        ],
        [ # make sure Dave's little pi blog is running
            Check(page, 'https://waldorf.waveform.org.uk/'),
            Check(page, 'https://waldorf.waveform.org.uk/pages/about.html'),
            Check(page, 'https://waldorf.waveform.org.uk/archives.html'),
            Check(page, 'https://waldorf.waveform.org.uk/tags.html'),
            Check(page, 'https://waldorf.waveform.org.uk/2020/package-configuration.html'),
        ],
        [ # a coloured line
         (255, 127, 0)
        ] * 8,
        [ # are our backups working?
            Check(file, '/var/backups/dpkg.status.0'),
            Check(file, '/var/backups/apt.extended_states.0'),
            Check(file, '/tmp/foo', every=5),
        ],
    ])

You can run the full script like so:

$ sudo ./monitor.py
A shot of the Unicorn HAT monitor running in its default configuration. The Unicorn HAT itself is mounted on a Black HAT Hacker board attached to a Raspberry Pi 400. Several micro SD card, and their adapters, litter the blue painted worktop surrounding the Pi. The neopixels on the Unicorn HAT are all lit in the default configuration of the script; the last one is red indicating that the check for the file /tmp/foo has failed.

Press Ctrl+C to exit the script.

The last file check is for /tmp/foo which probably doesn't exist so when you run this script you should see at least one blinking red "failure". Try running echo foo > /tmp/foo and watch the failure turn green after 5 seconds. Then rm /tmp/foo and watch it turn back to blinking red.

If you wish to run the script automatically on boot, place this service definition in /etc/systemd/system/unicorn-monitor.service (this assumes you've saved the script under /usr/local/bin/monitor.py):

[Unit]
Description=Unicorn HAT based monitor
After=local-fs.target network.target

[Service]
Type=simple
Restart=on-failure
ExecStart=/usr/bin/python3 /usr/local/bin/monitor.py

[Install]
WantedBy=multi-user.target

Then run the following and you should find that the monitor will start automatically on the next reboot:

$ sudo systemctl daemon-reload
$ sudo systemctl enable unicorn-monitor

Enjoy!

Unicorn Hat HD

Section written by Dave Jones

Moar Pixels!

The Unicorn HAT HD has an almost obscene number of neopixels, just waiting to spew rainbows at unsuspecting eyeballs. So much so, it seems a shame to waste them all on something as mundane as a two-tone monitoring service. Let's try for something more … colorful!

Old computers had huge arrays of lights now often referred to (slightly tongue in cheek) as blinkenlights. These often visualized actual machine registers, bus addresses and the like. These days memory is too large and too fast to make this useful. However, it's fairly typical (on an Ubuntu for Pi desktop image running a few things like a browser, terminal, etc.) to have a number of userland processes that would fit quite nicely within the 256 neopixels on the Unicorn HAT HD.

Let's build a modern blinkenlights variation that visualizes the process table. We'll use red to represent CPU usage, green to represent memory usage, and blue to represent the inverted age of a process (so brand new processes will show up bright blue, then fade as they age).

We can use ps to query the process table. We'll exclude PID 2 which on Linux represents the kernel, and any process under PID 2 (the kernel's various "threads" which are largely static and thus not very visually interesting). We'll ask ps for cp (per-mill CPU usage), rss (resident set size, a measure of the amount of actual RAM a process is using), and etimes (age of the process in seconds). Finally, we'll turn off the headers because we don't need them:

$ ps --pid 2 --ppid 2 --deselect -o cp,rss,etimes --no-headers
  1 10760   91980
  0 35128   91975
  0 23432   91975
  0  2916   91975
  0  3332   91971
  3  3364   91971
  0  7368   91970
  0  3868   91970
  0  3428   91969
  ....

This looks reasonable, so let's get coding. First, some imports:

#!/usr/bin/python3

import shlex
import subprocess as sp
from time import sleep
from threading import Thread, Event
from itertools import zip_longest

import unicornhathd

Next, our main monitor function:

def monitor():
    unicornhathd.rotation(0)
    unicornhathd.brightness(1.0)
    width, height = unicornhathd.get_shape()

    processes = {
        # coord  CPU   mem   age
        (x, y): (None, None, None)
        for y in range(height)
        for x in range(width)
    }
    limits = {'max-cpu': 0, 'max-mem': 0}
    updates = UpdateThread(processes, limits)
    updates.start()
    try:
        while True:
            if limits['max-cpu']:
                for (x, y), (cpu, mem, age) in processes.items():
                    if cpu is not None:
                        r = min(255, int(255 * cpu / limits['max-cpu']))
                        g = min(255, int(255 * mem / limits['max-mem']))
                        b = max(0, min(255, int(255 * (10 - age) / 10)))
                        unicornhathd.set_pixel(y, x, r, g, b)
                    else:
                        unicornhathd.set_pixel(y, x, 0, 0, 0)
            unicornhathd.show()
            sleep(1/30)
    finally:
        unicornhathd.off()
        updates.stop()
        updates.join()

This is fairly simple but let's go through it piece by piece. We start off by initializing:

  • The Unicorn HAT HD itself, with the unicornhathd module. We also grab its shape (which should always be 16x16).
  • processes -- a dictionary mapping coordinates on the display to a three-tuple representing CPU usage, RAM usage, and process age. These will be None by default (for any pixel not currently representing a process).
  • limits -- a simple dictionary acting as a way to pass a couple of other values (the current maximum CPU and memory usage) between the update thread and the main thread.
  • updates -- an instance of UpdateThread (which we'll define a bit later on), which we start before entering the main loop.

The main loop simply checks that we have a valid max-cpu value then updates the display from the values in the processes dictionary with some simple maths. All the values are scaled to the maximum CPU and memory usage so we get a nice distribution of color.

Now the UpdateThread itself:

class UpdateThread(Thread):
    def __init__(self, processes, limits):
        super().__init__(target=self.update, args=(processes, limits),
                         daemon=True)
        self._done = Event()

    def stop(self):
        self._done.set()

    def update(self, processes, limits):
        cmdline = shlex.split(
            'ps --pid 2 --ppid 2 --deselect -o cp,rss,etimes --no-headers')
        while not self._done.wait(1/30):
            proc = sp.run(cmdline, capture_output=True, text=True)
            output = proc.stdout.splitlines()
            max_cpu = max_mem = 0
            for coord, line in zip_longest(processes, output):
                if not coord:
                    break
                elif line:
                    cpu, mem, age = (int(i) for i in line.split())
                    max_cpu = max(cpu, max_cpu)
                    max_mem = max(mem, max_mem)
                    processes[coord] = (cpu, mem, age)
                else:
                    processes[coord] = (None, None, None)
            limits['max-cpu'] = max_cpu
            limits['max-mem'] = max_mem

This is also fairly simple; the update method just calls ps and loops over the lines of output sticking values into the processes dictionary and updating the limits dictionary with the biggest CPU and memory values found at the end.

Hang on a minute … how is this safe? Surely both the "main" thread, and our background update thread are going to be bashing away at these structures simultaneously?! In most languages this would indeed be a no-no. In Python it's safe, provided that the dictionary doesn't grow or shrink while we're iterating over it.

Note

The limitation on not inserting into / deleting from dictionaries while iterating over them is the primary reason for the structure of both these demo scripts. The fixed size of the dictionaries in both cases avoids the need for any explicit locking keeping the scripts both simple and reasonably efficient.

For those curious about what goes on under the covers, there is still some locking going on: the Python GIL is ensuring that both threads leave the structures in a consistent state between Python statements (actually between bytecodes, but let's not split hairs!).

It's also worth noting that, as there's no locking we can't guarantee that, at the time the main display thread is iterating over the processes dictionary, that the limits are actually accurate. This is why we're using min and max to clamp the calculation results in the main thread.

Finally, we just need to kick things off:

if __name__ == '__main__':
    monitor()

And we're done! Run the full script (you don't need sudo for this one as the Unicorn HAT HD has a very different control mechanism) and bask in the glow of das blinkenlights!

Try stopping and starting your web browser while its running and see just how many processes a modern browser encapsulates.

You may be surprised to see processes appearing and disappearing constantly at the "end" of the table. This is actually quite normal for a modern system. You may also, when your system is otherwise idle, see that the end of the table has one persistently red (CPU load) and one persistently blue ("new") process. Have a think about which processes these might be! In particular, why is one process always "new" …

Another shot of Dave's blue painted worktop. The Raspberry Pi 400 again has the Black HAT Hacker attached to it, but this time it's adorned with the Unicorn HAT HD. The blinkenlights script is running and most of the neopixels on the HAT are glowing a dim green. A couple are lit brightly yellow (indicating high CPU and RAM usage), one is bright red (high CPU alone) and one at the bottom is bright blue (a "new" process). The usual clutter of cards and cables surrounds the board.

Press Ctrl+C to exit the script.

If you wish to run the script automatically on boot, place this service definition in /etc/systemd/system/das-blinkenlights.service (this assumes you've saved the script as /usr/local/bin/blinkenlights.py):

[Unit]
Description=Unicorn HAT HD based process table monitor
After=local-fs.target

[Service]
Type=simple
User=ubuntu
Restart=on-failure
ExecStart=/usr/bin/python3 /usr/local/bin/blinkenlights.py

[Install]
WantedBy=multi-user.target

Then run the following and you should find that the monitor will start automatically on the next reboot:

$ sudo systemctl daemon-reload
$ sudo systemctl enable das-blinkenlights

Unicorn Hat Mini

While small, the Unicorn Hat Mini has more pixels than the pHat so it can display much more. It also has four buttons, making things like simple games possible. This example is a very simple game of pong that I've created. Every time the ball hits a paddle the color of the ball changes and the game speeds up a little.

import time
import math
import random
import colorsys

from gpiozero import Button
from unicornhatmini import UnicornHATMini

unicornhatmini = UnicornHATMini()
unicornhatmini.set_brightness(0.5)
width, height = unicornhatmini.get_shape()

class Pong():
    def __init__(self):
        # start the paddles roughly halfway vertically
        self.l_paddle_y = 3
        self.r_paddle_y = 3
        self.delay = 0.3
        self.ball_x = 1
        self.ball_y = 1
        self.ball_horiz = 1
        self.ball_vert = 1
        self.game_on = True
        self.paddle_height = 3
        self.colors = (
            (255, 0, 0),
            (0, 255, 0),
            (0, 0, 255),
            (255, 255, 0),
            (255, 0, 255),
            (0, 255, 255),
            (255, 255, 255)
        )
        self.color = random.choice(self.colors)

    def l_paddle_down(self):
        if self.l_paddle_y < height - self.paddle_height:
            self.l_paddle_y += 1

    def l_paddle_up(self):
        if self.l_paddle_y > 0:
            self.l_paddle_y -= 1

    def r_paddle_down(self):
        if self.r_paddle_y < height - self.paddle_height:
            self.r_paddle_y += 1

    def r_paddle_up(self):
        if self.r_paddle_y > 0:
            self.r_paddle_y -= 1

    def update(self):
        # check if the game is over
        if self.ball_x in (0, width - 1):
            self.game_on = False
            return

        # clear the board state
        unicornhatmini.clear()

        # set the position of the paddles
        for i in range(3):
            unicornhatmini.set_pixel(0,
                    self.l_paddle_y + i,
                    255, 255, 255)
            unicornhatmini.set_pixel(width - 1,
                    self.r_paddle_y + i,
                    255, 255, 255)

        # calculate the next position of the ball
        ball_x_next = self.ball_x + self.ball_horiz
        ball_y_next = self.ball_y + self.ball_vert

        # check if the ball needs to bounce off of a paddle
        if (
                (ball_x_next == 0 and ball_y_next in
                    (self.l_paddle_y + i for i in range(3))) or
                (ball_x_next == width - 1 and ball_y_next in
                    (self.r_paddle_y + i for i in range(3)))
            ):
            # the paddle has hit the ball, so change direction
            self.ball_horiz = -self.ball_horiz
            # recalculate ball_x_next
            ball_x_next = self.ball_x + self.ball_horiz

            # since the ball hit a paddle
            # reduce the delay to speed up the game
            self.delay -= 0.01

            # change the color of the ball every time
            # the ball hits a paddle
            self.color = random.choice(self.colors)

        # check if the ball needs to bounce off of an edge
        if (
                (self.ball_y == 6 and self.ball_vert == 1) or
                (self.ball_y == 0 and self.ball_vert == -1)
            ):
            self.ball_vert = -self.ball_vert
            ball_y_next = self.ball_y + self.ball_vert

        self.ball_x = ball_x_next
        self.ball_y = ball_y_next
        unicornhatmini.set_pixel(self.ball_x,
                self.ball_y,
                *self.color)

        # show the game state
        unicornhatmini.show()

pong_game = Pong()

button_a = Button(5)   # left paddle up
button_b = Button(6)   # left paddle down
button_x = Button(16)  # right paddle up
button_y = Button(24)  # right paddle down

button_a.when_pressed = pong_game.l_paddle_up
button_b.when_pressed = pong_game.l_paddle_down
button_x.when_pressed = pong_game.r_paddle_up
button_y.when_pressed = pong_game.r_paddle_down

while pong_game.game_on:
    pong_game.update()
    time.sleep(pong_game.delay)
A Unicorn Hat Mini with a game of Pong being played on it. The player on the left wins the point!

Pimoroni has many more examples on their GitHub repo!