dht22mqtt.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. #!/usr/bin/python3
  2. from datetime import datetime
  3. import time
  4. import os
  5. import statistics
  6. import csv
  7. import adafruit_dht
  8. # import RPi.GPIO as GPIO
  9. from gpiomapping import gpiomapping
  10. import paho.mqtt.client as mqtt
  11. # Begin
  12. dht22mqtt_start_ts = datetime.now()
  13. ###############
  14. # MQTT Params
  15. ###############
  16. mqtt_topic = os.getenv('topic', 'zigbee2mqtt/')
  17. mqtt_device_id = os.getenv('device_id', 'dht22')
  18. mqtt_brokeraddr = os.getenv('broker', '192.168.1.10')
  19. mqtt_username = os.getenv('username', None)
  20. mqtt_password = os.getenv('password', None)
  21. if not mqtt_topic.endswith('/'):
  22. mqtt_topic = mqtt_topic + "/"
  23. mqtt_topic = mqtt_topic + mqtt_device_id + '/'
  24. ###############
  25. # GPIO params
  26. ###############
  27. # TODO check if we can use the GPIO test https://github.com/kgbplus/gpiotest to autodetect pin
  28. # Problems with multiple sensors on the same device
  29. dht22mqtt_refresh = int(os.getenv('poll', '2'))
  30. dht22mqtt_pin = int(os.getenv('pin', '4'))
  31. dht22mqtt_device_type = str(os.getenv('device_type', 'dht22')).lower()
  32. dht22mqtt_temp_unit = os.getenv('unit', 'C')
  33. ###############
  34. # MQTT & Logging params
  35. ###############
  36. dht22mqtt_mqtt_chatter = str(os.getenv('mqtt_chatter', 'essential|ha|full')).lower()
  37. dht22mqtt_logging_mode = str(os.getenv('logging', 'None')).lower()
  38. dht22mqtt_sensor_tally = dict()
  39. ###############
  40. # Filtering & Sampling Params
  41. ###############
  42. dht22mqtt_filtering_enabled = str(os.getenv('filtering', 'enabled')).lower()
  43. dht22_temp_stack = []
  44. dht22_temp_stack_errors = 0
  45. dht22_hum_stack = []
  46. dht22_hum_stack_errors = 0
  47. dht22_stack_size = 10
  48. dht22_std_deviation = 3
  49. dht22_error_count_stack_flush = 3
  50. ###############
  51. # Logging functions
  52. ###############
  53. def log2file(filename, params):
  54. if('log2file' in dht22mqtt_logging_mode):
  55. ts_filename = dht22mqtt_start_ts.strftime('%Y-%m-%dT%H-%M-%SZ')+'_'+filename+".csv"
  56. with open("/log/"+ts_filename, "a+") as file:
  57. w = csv.DictWriter(file, delimiter=',', lineterminator='\n', fieldnames=params.keys())
  58. if file.tell() == 0:
  59. w.writeheader()
  60. w.writerow(params)
  61. def log2stdout(timestamp, msg):
  62. if('log2stdout' in dht22mqtt_logging_mode):
  63. print(datetime.fromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%SZ'), str(msg))
  64. ###############
  65. # Polling functions
  66. ###############
  67. def getTemperatureJitter(temperature):
  68. return getTemperature(temperature-0.3), getTemperature(temperature+0.3)
  69. def getTemperature(temperature):
  70. if(dht22mqtt_temp_unit == 'F'):
  71. temperature = temperature * (9 / 5) + 32
  72. return temperature
  73. def getHumidity(humidity):
  74. return humidity
  75. ###############
  76. # Processing function
  77. ###############
  78. def processSensorValue(stack, error, value, value_type):
  79. # flush stack on accumulation of errors
  80. if(error >= dht22_error_count_stack_flush):
  81. stack = []
  82. error = 0
  83. # init stack
  84. if(len(stack) <= dht22_error_count_stack_flush):
  85. if(value not in stack):
  86. stack.append(value)
  87. # use jitter for bootstrap temperature stack
  88. if(value_type == 'temperature'):
  89. low, high = getTemperatureJitter(value)
  90. stack.append(low)
  91. stack.append(high)
  92. return stack, error, None
  93. # get statistics
  94. std = statistics.pstdev(stack)
  95. mean = statistics.mean(stack)
  96. # compute if outlier or not
  97. if(mean-std*dht22_std_deviation < value < mean+std*dht22_std_deviation):
  98. outlier = False
  99. if(value not in stack):
  100. stack.append(value)
  101. error = 0
  102. else:
  103. outlier = True
  104. error += 1
  105. # remove last element from stack
  106. if(len(stack) > 10):
  107. stack.pop(0)
  108. return stack, error, outlier
  109. ###############
  110. # MQTT update functions
  111. ###############
  112. def updateEssentialMqtt(temperature, humidity, detected):
  113. if('essential' in dht22mqtt_mqtt_chatter):
  114. if(detected == 'accurate'):
  115. payload = '{ "temperature": '+str(temperature)+', "humidity": '+str(humidity)+' }'
  116. client.publish(mqtt_topic + 'value', payload, qos=1, retain=True)
  117. client.publish(mqtt_topic + "detected", str(detected), qos=1, retain=True)
  118. elif(detected == 'bypass'):
  119. payload = '{ "temperature": '+str(temperature)+', "humidity": '+str(humidity)+' }'
  120. client.publish(mqtt_topic + 'value', payload, qos=1, retain=True)
  121. client.publish(mqtt_topic + "detected", str(detected), qos=1, retain=True)
  122. else:
  123. client.publish(mqtt_topic + "detected", str(detected), qos=1, retain=True)
  124. client.publish(mqtt_topic + "updated", str(datetime.now()), qos=1, retain=True)
  125. def registerWithHomeAssitant():
  126. if('ha' in dht22mqtt_mqtt_chatter):
  127. ha_temperature_config = '{"device_class": "temperature",' + \
  128. ' "name": "'+mqtt_device_id+'_temperature",' + \
  129. ' "state_topic": "'+mqtt_topic+'value",' + \
  130. ' "unit_of_measurement": "°'+dht22mqtt_temp_unit+'",' + \
  131. ' "value_template": "{{ value_json.temperature}}" }'
  132. ha_humidity_config = '{"device_class": "humidity",' + \
  133. ' "name": "'+mqtt_device_id+'_humidity",' + \
  134. ' "state_topic": "'+mqtt_topic+'value",' + \
  135. ' "unit_of_measurement": "%",' + \
  136. ' "value_template": "{{ value_json.humidity}}" }'
  137. client.publish('homeassistant/sensor/'+mqtt_device_id+'Temperature/config', ha_temperature_config, qos=1, retain=True)
  138. client.publish('homeassistant/sensor/'+mqtt_device_id+'Humidity/config', ha_humidity_config, qos=1, retain=True)
  139. log2stdout(datetime.now().timestamp(), 'Registering sensor with home assistant success...')
  140. def updateFullSysInternalsMqtt(key):
  141. if('full' in dht22mqtt_mqtt_chatter):
  142. client.publish(mqtt_topic + "sys/temperature_stack_size", len(dht22_temp_stack), qos=1, retain=True)
  143. client.publish(mqtt_topic + "sys/temperature_error_count", dht22_temp_stack_errors, qos=1, retain=True)
  144. client.publish(mqtt_topic + "sys/humidity_stack_size", len(dht22_hum_stack), qos=1, retain=True)
  145. client.publish(mqtt_topic + "sys/humidity_error_count", dht22_hum_stack_errors, qos=1, retain=True)
  146. client.publish(mqtt_topic + "updated", str(datetime.now()), qos=1, retain=True)
  147. if key in dht22mqtt_sensor_tally:
  148. dht22mqtt_sensor_tally[key] += 1
  149. else:
  150. dht22mqtt_sensor_tally[key] = 1
  151. client.publish(mqtt_topic + "sys/tally/" + key, dht22mqtt_sensor_tally[key], qos=1, retain=True)
  152. client.publish(mqtt_topic + "updated", str(datetime.now()), qos=1, retain=True)
  153. ###############
  154. # Setup dht22 sensor
  155. ###############
  156. log2stdout(dht22mqtt_start_ts.timestamp(), 'Starting dht22mqtt...')
  157. if(dht22mqtt_device_type == 'dht22' or dht22mqtt_device_type == 'am2302'):
  158. dhtDevice = adafruit_dht.DHT22(gpiomapping[dht22mqtt_pin], use_pulseio=False)
  159. elif(dht22mqtt_device_type == 'dht11'):
  160. dhtDevice = adafruit_dht.DHT11(gpiomapping[dht22mqtt_pin], use_pulseio=False)
  161. else:
  162. log2stdout(datetime.now().timestamp(), 'Unsupported device '+dht22mqtt_device_type+'...')
  163. log2stdout(datetime.now().timestamp(), 'Devices supported by this container are DHT11/DHT22/AM2302')
  164. log2stdout(datetime.now().timestamp(), 'Setup dht22 sensor success...')
  165. ###############
  166. # Setup mqtt client
  167. ###############
  168. if('essential' in dht22mqtt_mqtt_chatter):
  169. client = mqtt.Client('DHT22', clean_session=True, userdata=None)
  170. if mqtt_username:
  171. client.username_pw_set(username=mqtt_username, password=mqtt_password)
  172. # set last will for an ungraceful exit
  173. client.will_set(mqtt_topic + "state", "OFFLINE", qos=1, retain=True)
  174. # keep alive for 60 times the refresh rate
  175. client.connect(mqtt_brokeraddr, keepalive=dht22mqtt_refresh*60)
  176. client.loop_start()
  177. client.publish(mqtt_topic + "type", "sensor", qos=1, retain=True)
  178. client.publish(mqtt_topic + "device", "dht22", qos=1, retain=True)
  179. if('full' in dht22mqtt_mqtt_chatter):
  180. client.publish(mqtt_topic + "env/pin", dht22mqtt_pin, qos=1, retain=True)
  181. client.publish(mqtt_topic + "env/brokeraddr", mqtt_brokeraddr, qos=1, retain=True)
  182. client.publish(mqtt_topic + "env/username", mqtt_username, qos=1, retain=True)
  183. client.publish(mqtt_topic + "env/refresh", dht22mqtt_refresh, qos=1, retain=True)
  184. client.publish(mqtt_topic + "env/logging", dht22mqtt_logging_mode, qos=1, retain=True)
  185. client.publish(mqtt_topic + "env/mqtt_chatter", dht22mqtt_mqtt_chatter, qos=1, retain=True)
  186. client.publish(mqtt_topic + "sys/dht22_stack_size", dht22_stack_size, qos=1, retain=True)
  187. client.publish(mqtt_topic + "sys/dht22_std_deviation", dht22_std_deviation, qos=1, retain=True)
  188. client.publish(mqtt_topic + "sys/dht22_error_count_stack_flush", dht22_error_count_stack_flush, qos=1, retain=True)
  189. client.publish(mqtt_topic + "updated", str(datetime.now()), qos=1, retain=True)
  190. log2stdout(datetime.now().timestamp(), 'Setup mqtt client success...')
  191. client.publish(mqtt_topic + "state", "ONLINE", qos=1, retain=True)
  192. registerWithHomeAssitant()
  193. log2stdout(datetime.now().timestamp(), 'Begin capture...')
  194. while True:
  195. try:
  196. dht22_ts = datetime.now().timestamp()
  197. temperature = getTemperature(dhtDevice.temperature)
  198. humidity = getHumidity(dhtDevice.humidity)
  199. temp_data = processSensorValue(dht22_temp_stack,
  200. dht22_temp_stack_errors,
  201. temperature,
  202. 'temperature')
  203. dht22_temp_stack = temp_data[0]
  204. dht22_temp_stack_errors = temp_data[1]
  205. temperature_outlier = temp_data[2]
  206. hum_data = processSensorValue(dht22_hum_stack,
  207. dht22_hum_stack_errors,
  208. humidity,
  209. 'humidity')
  210. dht22_hum_stack = hum_data[0]
  211. dht22_hum_stack_errors = hum_data[1]
  212. humidity_outlier = hum_data[2]
  213. # Since the intuition here is that errors in humidity and temperature readings
  214. # are heavily correlated, we can skip mqtt if we detect either.
  215. detected = ''
  216. if(temperature_outlier is False and humidity_outlier is False):
  217. detected = 'accurate'
  218. else:
  219. detected = 'outlier'
  220. # Check if filtering enabled
  221. if('enabled' in dht22mqtt_filtering_enabled):
  222. updateEssentialMqtt(temperature, humidity, detected)
  223. else:
  224. updateEssentialMqtt(temperature, humidity, 'bypass')
  225. updateFullSysInternalsMqtt(detected)
  226. data = {'timestamp': dht22_ts,
  227. 'temperature': temperature,
  228. 'humidity': humidity,
  229. 'temperature_outlier': temperature_outlier,
  230. 'humidity_outlier': humidity_outlier}
  231. log2stdout(dht22_ts, data)
  232. log2file('recording', data)
  233. time.sleep(dht22mqtt_refresh)
  234. except RuntimeError as error:
  235. # DHT22 throws errors often. Keep reading.
  236. detected = 'error'
  237. updateEssentialMqtt(None, None, detected)
  238. updateFullSysInternalsMqtt(error.args[0])
  239. data = {'timestamp': dht22_ts, 'error_type': error.args[0]}
  240. log2stdout(dht22_ts, data)
  241. log2file('error', data)
  242. time.sleep(dht22mqtt_refresh)
  243. continue
  244. except Exception as error:
  245. if('essential' in dht22mqtt_mqtt_chatter):
  246. client.disconnect()
  247. dhtDevice.exit()
  248. raise error
  249. # Graceful exit
  250. if('essential' in dht22mqtt_mqtt_chatter):
  251. client.publish(mqtt_topic + "state", "OFFLINE", qos=2, retain=True)
  252. client.publish(mqtt_topic + "updated", str(datetime.now()), qos=2, retain=True)
  253. client.disconnect()
  254. dhtDevice.exit()