Display-o-Tron: toy music player with scrolling colors

Here’s a “toy” music player written using Python, and GStreamer. On the Display-o-Tron LCD, it will show artist name, title and time. While playing, the music is analyzed ala moodbar to turn the sound into a RGB triplet. The resulting colors are scrolled along the backlight. It’s pretty nice! :)

I’m missing the equipment for a proper video. Feel free to make one!

#!/usr/bin/env python3
#
# dot_mood_player.py:
#     GStreamer-based player that will scroll colors on the Display-o-Tron
#     depending on the music “mood”.
#
#     On Debian:
#         # apt install --no-install-recommends \
#                       python3-gi gir1.2-gstreamer-1.0 gir1.2-gst-plugins-base-1.0 \
#                       gstreamer1.0-plugins-base gstreamer1.0-plugins-good \
#                       gstreamer1.0-alsa
#
#     Usage:
#         ./dot_mood_player.py <audio-file>
#
# Copyright © 2016 Lunar
#
# This work is free. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.

import os
import os.path
import sys
import subprocess
import itertools
import re
import atexit
import signal
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst

from dothat import backlight, lcd

SPECTRUM_INTERVAL = 100000000 # 0.1 second
SPECTRUM_MAGNITUDE_THRESHOLD = -60 # dB
LED_COUNT = 6
MINIMUM_BACKLIGHT = 1
DISPLAY_WIDTH = 16

ELLIPSIS_CHAR = [
      0b00000
    , 0b00000
    , 0b00000
    , 0b00000
    , 0b00000
    , 0b00000
    , 0b10101
    , 0b00000
    ]

class Player(object):
    def __init__(self, media_path):
        self._media_path = media_path

        # Initialize with first lights at the center
        self._leds = [(0, 0, 0)] * LED_COUNT

        self._duration = None
        self._playing = False

        self._label = ''
        self._label_position = None

        self.setup_pipeline()
        bus = self._pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message::tag", self.handle_bus_message_tag_event)
        bus.connect("message::async-done", self.handle_bus_async_done)
        bus.connect("message::duration-changed", self.handle_bus_duration_changed)
        bus.connect("message::element", self.handle_element)

        self.init_display()

    def setup_pipeline(self):
         pipeline = '''filesrc location="{location}" ! decodebin ! audioconvert !
                       spectrum bands=3 post-messages=true message-magnitude=true threshold={threshold} interval={interval} !
                       autoaudiosink'''.format(location=self._media_path, threshold=SPECTRUM_MAGNITUDE_THRESHOLD, interval=SPECTRUM_INTERVAL)
         self._pipeline = Gst.parse_launch(pipeline)

    def scroll_color(self, *color):
        self._leds = self._leds[1:] + [color]
        for led, colors in enumerate(self._leds):
            backlight.single_rgb(led, *colors, auto_update=False)
        backlight.update()

    def update_backlight(self, r, g, b):
        if (r + g + b) / 3 < MINIMUM_BACKLIGHT:
            r, g, b = [MINIMUM_BACKLIGHT] * 3
        self.scroll_color(r, g, b)

    def transition_backlight(self, color_iterator):
        try:
            self.move_leds(*next(color_iterator))
            return True
        except StopIteration:
            return False

    def handle_element(self, bus, message):
        if message.get_structure().get_name() == 'spectrum':
            self.handle_spectrum(bus, message)

    WORKAROUND_RE = re.compile(r'magnitude=\(float\){ ([0-9.-]+), ([0-9.-]+), ([0-9.-]+) };')

    def handle_spectrum(self, bus, message):
        # Ugly workaround, although thanks Christian Leichsenring
        # http://thread.gmane.org/gmane.comp.video.gstreamer.devel/56869
        s = message.get_structure().to_string()
        m = Player.WORKAROUND_RE.search(s)
        if not m:
            return
        magnitudes = list(map(float, m.groups()))
        colors = [ int((SPECTRUM_MAGNITUDE_THRESHOLD - magnitude) * 255 / SPECTRUM_MAGNITUDE_THRESHOLD) for magnitude in magnitudes ]
        self.update_backlight(*colors)

    def init_display(self):
        lcd.clear()
        lcd.create_char(1, ELLIPSIS_CHAR)
        backlight.rgb(64, 64, 64)

    def update_label(self, tags):
        if tags['artist'] and tags['title']:
            self.label = '{artist} - {title}'.format(**tags)
        elif tags['title']:
            self.label = tags['title']
        else:
            self.label = os.path.basename(self._media_path)

    @property
    def label(self):
        return self._label

    @label.setter
    def label(self, value):
        self._label = value
        self._label_position = itertools.cycle(itertools.chain(itertools.repeat(0, 10), range(len(self._label))))

    def update_display(self):
        lcd.set_cursor_position(0, 0)
        if len(self.label) > DISPLAY_WIDTH:
            s = self.label[next(self._label_position):]
            if len(s) > DISPLAY_WIDTH:
                lcd.write('{}\1'.format(s[:DISPLAY_WIDTH - 1]))
            else:
                format_str = '{:%s}' % DISPLAY_WIDTH
                lcd.write(format_str.format(s))
        else:
            lcd.write(self.label)
        success, position = self._pipeline.query_position(Gst.Format.TIME)
        if success:
            position /= (1000 * 1000 * 1000) # get seconds
            lcd.set_cursor_position(10, 2)
            lcd.write('{:3d}:{:02d}'.format(int(position // 60), int(position % 60)))
        return self._playing

    def handle_bus_message_tag_event(self, bus, message):
        taglist = message.parse_tag()
        tags = { tag: taglist.get_string(tag)[1] for tag in ['artist', 'title'] }
        self.update_label(tags)

    def handle_bus_async_done(self, bus, message):
        if not self._playing:
            self._playing = True
            self.update_duration()
            GLib.timeout_add(250, self.update_display)
        else:
            self._playing = False

    def handle_bus_duration_changed(self, bus, message):
        self.update_duration()

    def play(self):
        self._pipeline.set_state(Gst.State.PLAYING)

    def update_duration(self):
        success, duration = self._pipeline.query_duration(Gst.Format.TIME)
        if success:
            self._duration = duration

def cleanup():
    lcd.clear()
    backlight.off()

def main():
    args = Gst.init(sys.argv)
    player = Player(args[1])
    atexit.register(cleanup)
    GLib.idle_add(player.play)
    main_loop = GLib.MainLoop()
    try:
        main_loop.run()
    except KeyboardInterrupt:
        main_loop.quit()

if __name__ == '__main__':
    main()
1 Like

Thanks for sharing, I’m an old school C/C++ guy from the 80s and I’m learning Python for my degree. Examples like this makes it much more interesting to play with Python and learn it than the data set processing that I am having to do. :)

Ok, here’s one, terribly blurry and without sound… but you’ll get the idea.

Running on Pi Zero + pHAT DAC + Display-o-Tron.