Towards working sensors

This commit is contained in:
riot 2020-05-04 20:50:52 +02:00
parent c38d5e6486
commit 27056c9487
4 changed files with 474 additions and 44 deletions

View File

@ -2,4 +2,9 @@
#import esp
#esp.osdebug(None)
import webrepl
def clear_main():
import os
os.remove('main.py')
webrepl.start()

View File

@ -1,62 +1,133 @@
import time
import network
from mqtt import MQTTClient
import machine
import time
import tsl2591
import tcs34725
i2c = machine.I2C(-1, machine.Pin(33), machine.Pin(32))
sensor = tcs34725.TCS34725(i2c)
tsl = tsl2591.Tsl2591(0)
full, ir = tsl.get_full_luminosity() # read raw values (full spectrum and ir spectrum)
lux = tsl.calculate_lux(full, ir) # convert raw values to lux
print(lux, full, ir)
running = True
def sub_cb(topic, msg):
print(msg)
global running
if msg == b'stop':
print('Stopping execution')
running = False
if msg == b'start' and running is False:
print('Resuming execution')
running = True
main()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
#wlan.connect("c-base-botnet", auth=(WLAN.WPA2, "wifipassword"), timeout=5000)
# wlan.connect("c-base-botnet", auth=(WLAN.WPA2, "wifipassword"), timeout=5000)
while not wlan.isconnected():
machine.idle()
print("Connected to Wifi\n")
client = MQTTClient("windsensor_0", "192.168.178.98",user="", password="", port=1883)
#client = MQTTClient("windsensor_0", "mqtt.cbrp3.c-base.org",user="", password="", port=1883)
client = MQTTClient("bioreactor_0", "192.168.178.98", user="", password="", port=1883)
# client = MQTTClient("windsensor_0", "mqtt.cbrp3.c-base.org",user="", password="", port=1883)
client.set_callback(sub_cb)
client.connect()
client.subscribe(topic="/hackerfleet/sensors/windsensor/control")
client.subscribe(topic="/hackerfleet/sensors/bioreactor/control")
topic_pub = "/hackerfleet/sensors/bioreactor/status"
print("Connected to MQTT")
speedInterrupts = 0
directionInterrupts = 0
configuration = {
'pin_light': 3,
'pin_rotor': 4,
'process': [
{
'action': 'sleep',
'duration': 1
},
{
'action': 'start_measurement',
},
{
'action': 'spin_rotor',
'duration': 1,
},
{
'action': 'sleep',
'duration': 0
},
{
'action': 'stop_measurement',
},
# {
# 'action': 'stop_program'
# }
]
}
speedTicks = 0
directionTicks = 0
colour_sensor_address = 0x29
def callbackSpeed(pin):
global speedInterrupts
speedInterrupts += 1
def callbackDirection(pin):
global directionInterrupts
directionInterrupts += 1
def wait(duration):
print('Sleeping %s seconds' % duration)
time.sleep(duration)
speedPin = machine.Pin(25, machine.Pin.IN, machine.Pin.PULL_UP)
directionPin = machine.Pin(26, machine.Pin.IN, machine.Pin.PULL_UP)
speedPin.irq(trigger=machine.Pin.IRQ_FALLING, handler=callbackSpeed)
directionPin.irq(trigger=machine.Pin.IRQ_FALLING, handler=callbackSpeed)
def switch_pin(pin, state):
print('Switching pin %i to %s' % (pin, state))
print("Pin setup done")
while True:
def start_measurement():
print('Starting measurement')
data = sensor.read()
print(data)
if speedInterrupts > 0 or directionInterrupts > 0:
state = machine.disable_irq()
if speedInterrupts > 0:
speedTicks += 1
speedInterrupts -= 1
if directionInterrupts > 0:
directionTicks += 1
directionInterrupts -= 1
machine.enable_irq(state)
def stop_measurement():
print('Stopping measurement')
print("Direction: ", str(directionTicks), " Speed: ", str(speedTicks))
def spin_rotor(duration):
print('Stirring')
switch_pin(configuration['pin_rotor'], True)
wait(duration)
print('Stopping')
switch_pin(configuration['pin_rotor'], False)
def main():
global running
while running:
print('Restarting process')
client.check_msg()
client.publish(topic_pub, "Restarting")
data = []
for item in configuration['process']:
client.publish(topic_pub, str(item))
if item['action'] == 'sleep':
wait(item['duration'])
if item['action'] == 'start_measurement':
start_measurement()
if item['action'] == 'stop_measurement':
stop_measurement()
if item['action'] == 'spin_rotor':
spin_rotor(item['duration'])
if item['action'] == 'stop_program':
print('Stopping program')
running = False
main()

171
src/tcs34725.py Normal file
View File

@ -0,0 +1,171 @@
import time
import ustruct
#const = lambda x:x
_COMMAND_BIT = const(0x80)
_REGISTER_ENABLE = const(0x00)
_REGISTER_ATIME = const(0x01)
_REGISTER_AILT = const(0x04)
_REGISTER_AIHT = const(0x06)
_REGISTER_ID = const(0x12)
_REGISTER_APERS = const(0x0c)
_REGISTER_CONTROL = const(0x0f)
_REGISTER_SENSORID = const(0x12)
_REGISTER_STATUS = const(0x13)
_REGISTER_CDATA = const(0x14)
_REGISTER_RDATA = const(0x16)
_REGISTER_GDATA = const(0x18)
_REGISTER_BDATA = const(0x1a)
_ENABLE_AIEN = const(0x10)
_ENABLE_WEN = const(0x08)
_ENABLE_AEN = const(0x02)
_ENABLE_PON = const(0x01)
_GAINS = (1, 4, 16, 60)
_CYCLES = (0, 1, 2, 3, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60)
class TCS34725:
def __init__(self, i2c, address=0x29):
self.i2c = i2c
self.address = address
self._active = False
self.integration_time(2.4)
sensor_id = self.sensor_id()
if sensor_id not in (0x44, 0x10):
raise RuntimeError("wrong sensor id 0x{:x}".format(sensor_id))
def _register8(self, register, value=None):
register |= _COMMAND_BIT
if value is None:
return self.i2c.readfrom_mem(self.address, register, 1)[0]
data = ustruct.pack('<B', value)
try:
self.i2c.writeto_mem(self.address, register, data)
except OSError:
print("Could not write to i2c")
def _register16(self, register, value=None):
register |= _COMMAND_BIT
if value is None:
data = self.i2c.readfrom_mem(self.address, register, 2)
return ustruct.unpack('<H', data)[0]
data = ustruct.pack('<H', value)
self.i2c.writeto_mem(self.address, register, data)
def active(self, value=None):
if value is None:
return self._active
value = bool(value)
if self._active == value:
return
self._active = value
enable = self._register8(_REGISTER_ENABLE)
if value:
self._register8(_REGISTER_ENABLE, enable | _ENABLE_PON)
time.sleep_ms(3)
self._register8(_REGISTER_ENABLE,
enable | _ENABLE_PON | _ENABLE_AEN)
else:
self._register8(_REGISTER_ENABLE,
enable & ~(_ENABLE_PON | _ENABLE_AEN))
def sensor_id(self):
return self._register8(_REGISTER_SENSORID)
def integration_time(self, value=None):
if value is None:
return self._integration_time
value = min(614.4, max(2.4, value))
cycles = int(value / 2.4)
self._integration_time = cycles * 2.4
return self._register8(_REGISTER_ATIME, 256 - cycles)
def gain(self, value):
if value is None:
return _GAINS[self._register8(_REGISTER_CONTROL)]
if value not in _GAINS:
raise ValueError("gain must be 1, 4, 16 or 60")
return self._register8(_REGISTER_CONTROL, _GAINS.index(value))
def _valid(self):
return bool(self._register8(_REGISTER_STATUS) & 0x01)
def read(self, raw=False):
was_active = self.active()
self.active(True)
while not self._valid():
time.sleep_ms(int(self._integration_time + 0.9))
data = tuple(self._register16(register) for register in (
_REGISTER_RDATA,
_REGISTER_GDATA,
_REGISTER_BDATA,
_REGISTER_CDATA,
))
self.active(was_active)
if raw:
return data
return self._temperature_and_lux(data)
def _temperature_and_lux(self, data):
r, g, b, c = data
x = -0.14282 * r + 1.54924 * g + -0.95641 * b
y = -0.32466 * r + 1.57837 * g + -0.73191 * b
z = -0.68202 * r + 0.77073 * g + 0.56332 * b
d = x + y + z
n = (x / d - 0.3320) / (0.1858 - y / d)
cct = 449.0 * n**3 + 3525.0 * n**2 + 6823.3 * n + 5520.33
return cct, y
def threshold(self, cycles=None, min_value=None, max_value=None):
if cycles is None and min_value is None and max_value is None:
min_value = self._register16(_REGISTER_AILT)
max_value = self._register16(_REGISTER_AILT)
if self._register8(_REGISTER_ENABLE) & _ENABLE_AIEN:
cycles = _CYCLES[self._register8(_REGISTER_APERS) & 0x0f]
else:
cycles = -1
return cycles, min_value, max_value
if min_value is not None:
self._register16(_REGISTER_AILT, min_value)
if max_value is not None:
self._register16(_REGISTER_AIHT, max_value)
if cycles is not None:
enable = self._register8(_REGISTER_ENABLE)
if cycles == -1:
self._register8(_REGISTER_ENABLE, enable & ~(_ENABLE_AIEN))
else:
self._register8(_REGISTER_ENABLE, enable | _ENABLE_AIEN)
if cycles not in _CYCLES:
raise ValueError("invalid persistence cycles")
self._register8(_REGISTER_APERS, _CYCLES.index(cycles))
def interrupt(self, value=None):
if value is None:
return bool(self._register8(_REGISTER_STATUS) & _ENABLE_AIEN)
if value:
raise ValueError("interrupt can only be cleared")
self.i2c.writeto(self.address, b'\xe6')
def html_rgb(data):
r, g, b, c = data
red = pow((int((r/c) * 256) / 255), 2.5) * 255
green = pow((int((g/c) * 256) / 255), 2.5) * 255
blue = pow((int((b/c) * 256) / 255), 2.5) * 255
return red, green, blue
def html_hex(data):
r, g, b = html_rgb(data)
return "{0:02x}{1:02x}{2:02x}".format(int(r),
int(g),
int(b))

183
src/tsl2591.py Normal file
View File

@ -0,0 +1,183 @@
# tsl2591 lux sensor interface
import time
VISIBLE = 2
INFRARED = 1
FULLSPECTRUM = 0
ADDR = 0x29
READBIT = 0x01
COMMAND_BIT = 0xA0
CLEAR_BIT = 0x40
WORD_BIT = 0x20
BLOCK_BIT = 0x10
ENABLE_POWERON = 0x01
ENABLE_POWEROFF = 0x00
ENABLE_AEN = 0x02
ENABLE_AIEN = 0x10
CONTROL_RESET = 0x80
LUX_DF = 408.0
LUX_COEFB = 1.64
LUX_COEFC = 0.59
LUX_COEFD = 0.86
REGISTER_ENABLE = 0x00
REGISTER_CONTROL = 0x01
REGISTER_THRESHHOLDL_LOW = 0x02
REGISTER_THRESHHOLDL_HIGH = 0x03
REGISTER_THRESHHOLDH_LOW = 0x04
REGISTER_THRESHHOLDH_HIGH = 0x05
REGISTER_INTERRUPT = 0x06
REGISTER_CRC = 0x08
REGISTER_ID = 0x0A
REGISTER_CHAN0_LOW = 0x14
REGISTER_CHAN0_HIGH = 0x15
REGISTER_CHAN1_LOW = 0x16
REGISTER_CHAN1_HIGH = 0x17
INTEGRATIONTIME_100MS = 0x00
INTEGRATIONTIME_200MS = 0x01
INTEGRATIONTIME_300MS = 0x02
INTEGRATIONTIME_400MS = 0x03
INTEGRATIONTIME_500MS = 0x04
INTEGRATIONTIME_600MS = 0x05
GAIN_LOW = 0x00
GAIN_MED = 0x10
GAIN_HIGH = 0x20
GAIN_MAX = 0x30
def _bytes_to_int(data):
return data[0] + (data[1]<<8)
from machine import I2C, Pin
class SMBusEmulator:
__slots__ = ('i2c',)
def __init__(self, scl_pinno=32, sda_pinno=33):
self.i2c = I2C(scl=Pin(scl_pinno, Pin.IN),
sda=Pin(sda_pinno, Pin.IN))
def write_byte_data(self, addr, cmd, val):
buf = bytes([cmd, val])
self.i2c.writeto(addr, buf)
def read_word_data(self, addr, cmd):
assert cmd < 256
buf = bytes([cmd])
self.i2c.writeto(addr, buf)
data = self.i2c.readfrom(addr, 4)
return _bytes_to_int(data)
SENSOR_ADDRESS=0x29
class Tsl2591:
def __init__(
self,
sensor_id,
integration=INTEGRATIONTIME_100MS,
gain=GAIN_LOW
):
self.sensor_id = sensor_id
self.bus = SMBusEmulator()
self.integration_time = integration
self.gain = gain
self.set_timing(self.integration_time)
self.set_gain(self.gain)
self.disable()
def set_timing(self, integration):
self.enable()
self.integration_time = integration
self.bus.write_byte_data(
SENSOR_ADDRESS,
COMMAND_BIT | REGISTER_CONTROL,
self.integration_time | self.gain
)
self.disable()
def set_gain(self, gain):
self.enable()
self.gain = gain
self.bus.write_byte_data(
SENSOR_ADDRESS,
COMMAND_BIT | REGISTER_CONTROL,
self.integration_time | self.gain
)
self.disable()
def calculate_lux(self, full, ir):
if (full == 0xFFFF) | (ir == 0xFFFF):
return 0
case_integ = {
INTEGRATIONTIME_100MS: 100.,
INTEGRATIONTIME_200MS: 200.,
INTEGRATIONTIME_300MS: 300.,
INTEGRATIONTIME_400MS: 400.,
INTEGRATIONTIME_500MS: 500.,
INTEGRATIONTIME_600MS: 600.,
}
if self.integration_time in case_integ.keys():
atime = case_integ[self.integration_time]
else:
atime = 100.
case_gain = {
GAIN_LOW: 1.,
GAIN_MED: 25.,
GAIN_HIGH: 428.,
GAIN_MAX: 9876.,
}
if self.gain in case_gain.keys():
again = case_gain[self.gain]
else:
again = 1.
cpl = (atime * again) / LUX_DF
lux1 = (full - (LUX_COEFB * ir)) / cpl
lux2 = ((LUX_COEFC * full) - (LUX_COEFD * ir)) / cpl
return max([lux1, lux2])
def enable(self):
self.bus.write_byte_data(
SENSOR_ADDRESS,
COMMAND_BIT | REGISTER_ENABLE,
ENABLE_POWERON | ENABLE_AEN | ENABLE_AIEN
)
def disable(self):
self.bus.write_byte_data(
SENSOR_ADDRESS,
COMMAND_BIT | REGISTER_ENABLE,
ENABLE_POWEROFF
)
def get_full_luminosity(self):
self.enable()
time.sleep(0.120*self.integration_time+1)
full = self.bus.read_word_data(
SENSOR_ADDRESS, COMMAND_BIT | REGISTER_CHAN0_LOW
)
ir = self.bus.read_word_data(
SENSOR_ADDRESS, COMMAND_BIT | REGISTER_CHAN1_LOW
)
self.disable()
return full, ir
def get_luminosity(self, channel):
full, ir = self.get_full_luminosity()
if channel == FULLSPECTRUM:
return full
elif channel == INFRARED:
return ir
elif channel == VISIBLE:
return full - ir
else:
return 0
def sample(self):
full, ir = self.get_full_luminosity()
return self.calculate_lux(full, ir)