Air quality station
Air quality in the country where i live is one of the worst in Europe. It is not so bad in the city where I live but it is good to be aware of if because of effects it has on human health. There are multiple places where you can check air quality probably the most accessible is one of the weather apps you can install on your phone, but they can be inaccurate because of placement of sensors. If you want to have the most accurate measurement you can buy one of the air quality measurements monitors or you can try to build one. If you want to try to do if by yourself, here is what you need:
- air quality sensor
- raspberry pi with WiFi module
- prior Elixir and Phoenix knowledge
Architecture
Our solution is going to consist of two parts. Nerves application that gets information about air quality and send it to Phoenix application which stores measurements and display them on dashboard.
Communication between device and web app is done through UDP. Server broadcasts ping message every second on a given subnet and all devices listening on it can reply. That way we only get information from sensors when we need them and we avoid situation where we can be flooded with messages from devices.
Device
We are going to use Nerves to manage Air Quality sensor. On the site you can read the most up to date information on how to create new project and deploy new code to device (in my case it is old raspberry pi).
We have two main components:
UdpClient - receives ping message from server and responds with data from Air Quality GenServer. Each message is json encoded map that consist of device_id which should be unique and probe which is latest air quality sensor measurement. Message is send back on the same address and port on which we received ping message.
defmodule Hermus.UdpClient do
use GenServer
alias Hermus.AirQuality
def start_link(port \\ 8681) do
GenServer.start_link(__MODULE__, port)
end
def init(port) do
:gen_udp.open(port, [:binary, active: true, broadcast: true])
end
def handle_info({:udp, socket, address, port, "ping"}, state) do
probe = AirQuality.last_probe()
:gen_udp.send(
socket,
address,
port,
Jason.encode!(%{device_id: Application.fetch_env!(:hermus, :id), probe: probe})
)
{:noreply, state}
end
end
AirQuality - GenServer which stores last measurement from Air Quality Sensor. It uses Circuits.UART library to get information from the sensor. Each measurements comes in message that we have to handle using handle_info function. To decode binary data we use SDS011 helper module - which is taken from: exodust. This module decodes binary data and return probe structure which consists of pm25 and pm10 measurements. You can read more about it here.
defmodule Hermus.AirQuality do
use GenServer
alias Hermus.SDS011
def start_link(_args) do
GenServer.start_link(__MODULE__, %{last_probe: nil}, name: __MODULE__)
end
def last_probe() do
GenServer.call(__MODULE__, :last_probe)
end
@impl true
def init(state) do
{:ok, pid} = Circuits.UART.start_link()
Circuits.UART.open(pid, port(), active: true)
{:ok, state}
end
@impl true
def handle_call(:last_probe, _from, state) do
{:reply, state.last_probe, state}
end
@impl true
def handle_info({:circuits_uart, _usb, message}, state) do
probe = SDS011.decode!(message)
state = %{state | last_probe: probe}
{:noreply, state}
end
defp port() do
Circuits.UART.enumerate()
|> Enum.find(fn {_key, value} -> value != %{} end)
|> then(fn
{port, _} -> port
_ -> nil
end)
end
end
Phoenix App
This application has two responsibilities:- Gather measurements from sensors.
- Display data on live view dashboard.
defmodule Hermus.UdpServer do
use GenServer
alias Hermus.DevicesRegistry
alias Hermus.DevicesSupervisor
alias Hermus.Device
def start_link(port \\ 8680) do
GenServer.start_link(__MODULE__, port)
end
def init(port) do
{:ok, network_info} = :inet.getif()
{_ip_address, broadcast_address, _} = find_local(network_info)
{:ok, socket} = :gen_udp.open(port, [:binary, active: true, broadcast: true])
{:ok, %{socket: socket, broadcast_address: broadcast_address}, {:continue, :init}}
end
def handle_continue(:init, state) do
send(self(), :ask_for_data)
{:noreply, state}
end
def handle_info(
:ask_for_data,
%{socket: socket, broadcast_address: broadcast_address} = state
) do
:gen_udp.send(socket, broadcast_address, 8681, "ping")
Process.send_after(self(), :ask_for_data, 1_000)
{:noreply, state}
end
def handle_info({:udp, _socket, _address, _port, data}, state) do
%{"device_id" => device_id, "probe" => probe} = Jason.decode!(data)
if !DevicesRegistry.exist?(device_id) do
DevicesSupervisor.start_child(device_id)
end
Device.add_probe(probe, device_id)
{:noreply, state}
end
defp find_local(networks) do
Enum.find(networks, fn {_, broadcast_address, _} -> broadcast_address != :undefined end)
end
end
For each device we create dynamically GenServer that handles incoming message. We use DevicesRegistry to check if GenServer for given device was already created. If current message is first for given device then we create new GenServer. Otherwise we add measurement(probe) via Device which saves data to db and broadcasts data to LiveView via Phoenix.PubSub. The reason for having separate GenServer for each device is to reduce bottleneck that we have in UdpServer. In current approach UdpServer acts as message broker which routes data to specific GenServer.
defmodule Hermus.Device do
use GenServer
alias Hermus.Devices
alias Hermus.DevicesRegistry
def start_link(device_identifier) do
GenServer.start_link(__MODULE__, device_identifier,
name: DevicesRegistry.via_tuple(device_identifier)
)
end
def add_probe(probe, device_identifier) when not is_nil(probe) do
device_identifier
|> DevicesRegistry.via_tuple()
|> GenServer.cast({:add_probe, probe})
end
def child_spec(device_identifier) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [device_identifier]},
restart: :transient
}
end
@impl true
def init(device_identifier) do
%{id: device_id} = Devices.add_device(device_identifier)
{:ok, device_id}
end
@impl true
def handle_cast({:add_probe, probe}, device_id) do
probe = Devices.add_probe(device_id, probe)
Phoenix.PubSub.broadcast(Hermus.PubSub, "probe", probe)
{:noreply, device_id}
end
end
When we mount LiveView we get latest measurements and display them using chart js.. We also subscribe to probe topic on which we receive real time measurements from devices. On new message we update assigns which are picked up by js hook which updates graph with latest measurements.
defmodule HermusWeb.ProbeLive.Index do
use HermusWeb, :live_view
alias Hermus.Devices
alias Hermus.Models
@probes_limit 20
@impl true
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(Hermus.PubSub, "probe")
end
{:ok, assign(socket, :devices, list_devices())}
end
@impl true
def handle_info(%Models.Probe{} = probe, socket) do
handle_new_probe(probe, socket)
end
defp handle_new_probe(probe, socket) do
devices = socket.assigns.devices
updated_devices =
update_in(devices[probe.device_id], fn device ->
probes =
device.probes
|> add(probe)
|> window()
%{device | probes: probes}
end)
socket = assign(socket, devices: updated_devices)
{:noreply, push_event(socket, "probe", probe)}
end
defp add(probes, probe) do
[probe | probes]
end
defp window(probes) do
Enum.take(probes, @probes_limit)
end
defp list_devices() do
@probes_limit
|> Devices.list()
|> Enum.into(%{}, fn device ->
{device.id, device}
end)
end
end
You can view full code at: https://github.com/elpikel/hermus