My code for this is really hacky and not something I'd want to be judged on or tell other people to use But I can throw in the bulk of it as a starting point other people can use to make something better.
I wrote a shell script to install all the packages and put the scripts into /etc/rc.local (to start at boot). The "KEYPAD" listed there is the USB ID of the keypad I used, which wasn't recognized as a USB keyboard natively.
#!/bin/sh
AUTOSTART="python `pwd`/run.sh"
KEYPAD="hid-generic vendor=0x05a4 product=0x9759 maxSize=2048"
opkg update
opkg install python-light python3-light pyOnionI2C libmosquitto mosquitto-client python3-codecs
# Set up the keypad
grep -q "$KEYPAD" /etc/modules.d/hid-generic || sed -i \$i"$KEYPAD" /etc/modules.d/hid-generic
# Cause the script to start at boot:
grep -q "$AUTOSTART" /etc/rc.local || sed -i \$i"$AUTOSTART" /etc/rc.local
That will get you all the packages needed to run things as well.
The most useful part is probably the library for working with the accelerometer. There was Raspberry Pi-specific code for working with it on GitHub, which I forked and modified slightly to work with the Onion: https://github.com/mccollam/mpu6050
I built a little service to poll the sensor and make the data available over a socket, which is here:
#!/usr/bin/env python
from mpu6050 import mpu6050
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 6000))
sock.listen(5)
mpu = mpu6050(0x68)
# Prime readings so we have something to compare against
accel = mpu.get_accel_data()
temp = mpu.get_temp()
while True:
connection, client_address = sock.accept()
try:
while True:
data = connection.recv(1024)
if data != '':
# TODO: Might eventually want to actually look at the incoming request
# but for now just return everything
accel = mpu.get_accel_data()
temp = mpu.get_temp()
# Looks like the json libs for python2 are busticated on the Onion :(
result = '{ "temp": ' + str(temp) + ', "x": ' + format(accel['x'], '.12f') + ', "y": ' + format(accel['y'], '.12f') + ', "z": ' + format(accel['z'], '.12f') + '}'
connection.sendall(result)
else:
break
finally:
connection.close()
(Note that I don't do a lot of error or bounds checking here, so it would be pretty easy to cause this script to crash by feeding it weird inputs over the socket it opens. So don't do that. )
I also ended up having to write my own JSON encoder as I was having issues with the built-in one on the Onion. By the time I got there I was getting a bit frustrated so I just generated it myself rather than digging into the errors too far.
Next up is the code that polls the keypad. I had written something really elegant that polled without blocking (using select
) and was lightweight and just generally great. It worked fine on my laptop but not on the Onion, so I ended up doing a huge hack and having a separate script listen for keyboard events and write the results to a file that gets read by the main program. I'm not proud of this:
import os
from subprocess import call
devicefile = '/dev/input/event0'
scratchfile = '/tmp/notify_0'
notifyfile = '/tmp/notify'
# MQTT path:
mqttPath = 'XXXXX/basement/dryer/'
# The location of the MQTT server:
mqttServer = 'mqtt.XXXXX.com'
# The MQTT server port (default 1883):
mqttPort = 1883
byte = []
i = open(devicefile, 'rb')
notifyPerson = ''
notifyMethod = ''
def mqttPub(path, value):
call(["mosquitto_pub", \
"-r", \
"-h", str(mqttServer), \
"-p", str(mqttPort), \
"-t", str(mqttPath) + str(path), "-m", str(value)])
while True:
for bit in i.read(1):
byte.append(bit)
if len(byte) == 8:
if byte[1] == 1 and byte[7] == 1:
if byte[3] == 55:
notifyPerson = 'Alpha'
notifyMethod = 'email'
elif byte[3] == 98:
notifyPerson = 'Alpha'
notifyMethod = 'sms'
elif byte[3] == 73:
notifyPerson = 'Bravo'
notifyMethod = 'email'
elif byte[3] == 72:
notifyPerson = 'Bravo'
notifyMethod = 'sms'
elif byte[3] == 77:
notifyPerson = 'Charlie'
notifyMethod = 'email'
elif byte[3] == 76:
notifyPerson = 'Charlie'
notifyMethod = 'sms'
elif byte[3] == 81:
notifyPerson = 'Delta'
notifyMethod = 'email'
elif byte[3] == 80:
notifyPerson = 'Delta'
notifyMethod = 'sms'
elif byte[3] == 83:
notifyPerson = 'Echo'
notifyMethod = 'email'
elif byte[3] == 82:
notifyPerson = 'Echo'
notifyMethod = 'sms'
elif byte[3] == 28:
notifyPerson = ''
notifyMethod = ''
o = open('/tmp/notify_0', 'w+')
o.write('{"notifyPerson": "' + notifyPerson + '", "notifyMethod": "' + notifyMethod + '"}')
o.close()
os.rename(scratchfile, notifyfile)
mqttPub('notifyPerson', notifyPerson)
mqttPub('notifyMethod', notifyMethod)
byte = []
The main chunk of code that pulls everything together and updates the UI is here:
#!/usr/bin/env python
# This script would be way shorter and cleaner if this worked:
#import mosquitto.publish as publish
import time
from subprocess import call
import select
import socket, json
# Name of this sensor (the MQTT path):
mqttPath = 'XXXXX/basement/dryer/'
# Whether or not to attempt to use an attached OLED display:
enableOLED = True
# The location of the MQTT server:
mqttServer = 'mqtt.XXXXX.com'
# The MQTT server port (default 1883):
mqttPort = 1883
# Frequency of MQTT messages (in seconds):
mqttFreq = 5
# Measurement frequency (in ms):
measureFreq = 100
# Absolute change of accelerometer readings to count as 'running':
jitter = 0.6
# Percentage of 'running' readings to change status to 'running':
runChange = .45
# Percentage of 'stopped' readings to change status to 'stopped':
stoppedChange = .65
# File to find the person to notify
notifyFile = '/tmp/notify'
#####
def mqttPub(path, value):
# If the MQTT libs worked, this would be a single line:
#publish.single(mqttPath + "path", value, hostname=mqttServer, port=mqttPort)
call(["mosquitto_pub", \
#"-r", \ # enable to retain old messages
"-h", str(mqttServer), \
"-p", str(mqttPort), \
"-t", str(mqttPath) + str(path), "-m", str(value)])
def update_display():
if notifyPerson != "":
notify = notifyPerson + " (via " + notifyMethod + ")"
else:
notify = "[no one]"
# By getting the string written to the display to be exactly the right length, we can
# update the display without clearing it first (which is slow and a bit ugly when doing
# it frequently). That's the reason for assembling it as one string with a bunch of
# carriage returns and blasting it to the display all at once.
dtempC = "%.1f" % temp
dtempF = "%.1f" % (9.0/5.0 * temp + 32)
display = "XXXXX Dryer\\n\\n" \
+ "Temp: " + dtempC + "'C (" + dtempF + "'F)" \
+ "\\nStatus: " + status \
+ "\\n\\nNotify:\\n " + notify + "\\n"
call(["oled-exp", "-q", "write", display])
#####
if enableOLED:
# Initialize and clear display
call(["oled-exp", "-q", "-i"])
cycles = 0
running = 0
stopped = 0
lastX = 0
lastY = 0
lastZ = 0
status = "stopped"
notifyPerson = ""
lastNotifyPerson = ""
notifyMethod = ""
lastNotifyMethod = ""
accel = {}
sleeptime = float(measureFreq) / 1000
while True:
accel['x'] = 0.0
accel['y'] = 0.0
accel['z'] = 0.0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('localhost', 6000))
message = 'feed me'
sock.sendall(message.encode())
data = sock.recv(1024)
parsed = json.loads(data.decode())
accel['x'] = parsed['x']
accel['y'] = parsed['y']
accel['z'] = parsed['z']
temp = parsed['temp']
finally:
sock.close()
# Measure change since last reading on each axis
# (a more accurate way to do this would be to get a differential but
# honestly that level of accuracy is not needed here and is more
# expensive to calculate)
dX = abs(accel['x'] - lastX)
dY = abs(accel['y'] - lastY)
dZ = abs(accel['z'] - lastZ)
if dX > jitter or dY > jitter or dZ > jitter:
running = running + 1
else:
stopped = stopped + 1
if (cycles >= mqttFreq):
if status == "stopped":
if float(running) / (float(running) + float(stopped)) > runChange:
status = "running"
else:
if float(stopped) / (float(running) + float(stopped)) > stoppedChange:
status = "stopped"
# FIXME: OMG THIS IS AWFUL
# HUGE HACK: This should actually receive an MQTT message for updating
# the person and method for notification. It doesn't currently because
# of the lack of support for Python MQTT in OpenWRT :(
notifyPerson = ""
notifyMethod = ""
o = open('/tmp/notify', 'w+')
o.write('{"notifyPerson": "", "notifyMethod": ""}')
o.close()
#mqttPub("notifyPerson", "")
#mqttPub("notifyMethod", "")
mqttPub("status", status)
mqttPub("temperature", temp)
cycles = 0
running = 0
stopped = 0
if enableOLED:
update_display()
lastX = accel['x']
lastY = accel['y']
lastZ = accel['z']
f = open(notifyFile, 'r')
notify = json.load(f)
notifyPerson = notify['notifyPerson']
notifyMethod = notify['notifyMethod']
if notifyPerson != lastNotifyPerson:
update_display()
lastNotifyPerson = notifyPerson
lastNotifyMethod = notifyMethod
f.close()
cycles = cycles + sleeptime
time.sleep(sleeptime)
Finally, the shell script that runs everything is:
#/bin/sh
# Yet another Onion bug, local DNS doesn't work by default :(
sed -i s/127.0.0.1/10.1.10.1/ /etc/resolv.conf
while true
do
echo '{"notifyPerson": "", "notifyMethod": ""}' > /tmp/notify
python /root/dryer/sensor.py &
SENSOR=$!
python3 /root/dryer/keyboard.py &
KEYBOARD=$!
sleep 3
python3 /root/dryer/ui.py
sleep 1
kill $SENSOR $KEYBOARD
done
(The kill
statement in there is just to make everything restart fresh if the mail script crashes. It was useful during debugging.)
I hope this is helpful to people! Like I said, probably the only really good bit is the library for the accelerometer so hit that up on GitHub.