logo

cmdarek

05-08-2022

Air quality station

Air quality in the country where i live is one of the worst in Europe. It is not so bad in the city where I live but it is good to be aware of if because of effects it has on human health. There are multiple places where you can check air quality probably the most accessible is one of the weather apps you can install on your phone, but they can be inaccurate because of placement of sensors. If you want to have the most accurate measurement you can buy one of the air quality measurements monitors or you can try to build one. If you want to try to do if by yourself, here is what you need:

Rasperry Pi with WiFi module and Air Quality sensor

Architecture

Our solution is going to consist of two parts. Nerves application that gets information about air quality and send it to Phoenix application which stores measurements and display them on dashboard.

Communication between device and web app is done through UDP. Server broadcasts ping message every second on a given subnet and all devices listening on it can reply. That way we only get information from sensors when we need them and we avoid situation where we can be flooded with messages from devices.

Device

We are going to use Nerves to manage Air Quality sensor. On the site you can read the most up to date information on how to create new project and deploy new code to device (in my case it is old raspberry pi). We have two main components:

UdpClient - receives ping message from server and responds with data from Air Quality GenServer. Each message is json encoded map that consist of device_id which should be unique and probe which is latest air quality sensor measurement. Message is send back on the same address and port on which we received ping message.

                    
defmodule Hermus.UdpClient do
  use GenServer

  alias Hermus.AirQuality

  def start_link(port \\ 8681) do
    GenServer.start_link(__MODULE__, port)
  end

  def init(port) do
    :gen_udp.open(port, [:binary, active: true, broadcast: true])
  end

  def handle_info({:udp, socket, address, port, "ping"}, state) do
    probe = AirQuality.last_probe()

    :gen_udp.send(
      socket,
      address,
      port,
      Jason.encode!(%{device_id: Application.fetch_env!(:hermus, :id), probe: probe})
    )

    {:noreply, state}
  end
end
                    
                    
AirQuality - GenServer which stores last measurement from Air Quality Sensor. It uses Circuits.UART library to get information from the sensor. Each measurements comes in message that we have to handle using handle_info function. To decode binary data we use SDS011 helper module - which is taken from: exodust. This module decodes binary data and return probe structure which consists of pm25 and pm10 measurements. You can read more about it here.
                    
defmodule Hermus.AirQuality do
  use GenServer

  alias Hermus.SDS011

  def start_link(_args) do
    GenServer.start_link(__MODULE__, %{last_probe: nil}, name: __MODULE__)
  end

  def last_probe() do
    GenServer.call(__MODULE__, :last_probe)
  end

  @impl true
  def init(state) do
    {:ok, pid} = Circuits.UART.start_link()

    Circuits.UART.open(pid, port(), active: true)

    {:ok, state}
  end

  @impl true
  def handle_call(:last_probe, _from, state) do
    {:reply, state.last_probe, state}
  end

  @impl true
  def handle_info({:circuits_uart, _usb, message}, state) do
    probe = SDS011.decode!(message)

    state = %{state | last_probe: probe}

    {:noreply, state}
  end

  defp port() do
    Circuits.UART.enumerate()
    |> Enum.find(fn {_key, value} -> value != %{} end)
    |> then(fn
      {port, _} -> port
      _ -> nil
    end)
  end
end
                    
                    

Phoenix App

This application has two responsibilities:
  • Gather measurements from sensors.
  • Display data on live view dashboard.
UdpServer - broadcasts ping message on given subnet and port every second. For each ping message, every device responds with latest measurement.
                    
defmodule Hermus.UdpServer do
  use GenServer

  alias Hermus.DevicesRegistry
  alias Hermus.DevicesSupervisor
  alias Hermus.Device

  def start_link(port \\ 8680) do
    GenServer.start_link(__MODULE__, port)
  end

  def init(port) do
    {:ok, network_info} = :inet.getif()
    {_ip_address, broadcast_address, _} = find_local(network_info)

    {:ok, socket} = :gen_udp.open(port, [:binary, active: true, broadcast: true])

    {:ok, %{socket: socket, broadcast_address: broadcast_address}, {:continue, :init}}
  end

  def handle_continue(:init, state) do
    send(self(), :ask_for_data)

    {:noreply, state}
  end

  def handle_info(
        :ask_for_data,
        %{socket: socket, broadcast_address: broadcast_address} = state
      ) do
    :gen_udp.send(socket, broadcast_address, 8681, "ping")

    Process.send_after(self(), :ask_for_data, 1_000)

    {:noreply, state}
  end

  def handle_info({:udp, _socket, _address, _port, data}, state) do
    %{"device_id" => device_id, "probe" => probe} = Jason.decode!(data)

    if !DevicesRegistry.exist?(device_id) do
      DevicesSupervisor.start_child(device_id)
    end

    Device.add_probe(probe, device_id)

    {:noreply, state}
  end

  defp find_local(networks) do
    Enum.find(networks, fn {_, broadcast_address, _} -> broadcast_address != :undefined end)
  end
end
                    
                    
For each device we create dynamically GenServer that handles incoming message. We use DevicesRegistry to check if GenServer for given device was already created. If current message is first for given device then we create new GenServer. Otherwise we add measurement(probe) via Device which saves data to db and broadcasts data to LiveView via Phoenix.PubSub. The reason for having separate GenServer for each device is to reduce bottleneck that we have in UdpServer. In current approach UdpServer acts as message broker which routes data to specific GenServer.
                    
defmodule Hermus.Device do
  use GenServer

  alias Hermus.Devices
  alias Hermus.DevicesRegistry

  def start_link(device_identifier) do
    GenServer.start_link(__MODULE__, device_identifier,
      name: DevicesRegistry.via_tuple(device_identifier)
    )
  end

  def add_probe(probe, device_identifier) when not is_nil(probe) do
    device_identifier
    |> DevicesRegistry.via_tuple()
    |> GenServer.cast({:add_probe, probe})
  end

  def child_spec(device_identifier) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [device_identifier]},
      restart: :transient
    }
  end

  @impl true
  def init(device_identifier) do
    %{id: device_id} = Devices.add_device(device_identifier)

    {:ok, device_id}
  end

  @impl true
  def handle_cast({:add_probe, probe}, device_id) do
    probe = Devices.add_probe(device_id, probe)

    Phoenix.PubSub.broadcast(Hermus.PubSub, "probe", probe)

    {:noreply, device_id}
  end
end
                    
                    
When we mount LiveView we get latest measurements and display them using chart js.. We also subscribe to probe topic on which we receive real time measurements from devices. On new message we update assigns which are picked up by js hook which updates graph with latest measurements.
                    
defmodule HermusWeb.ProbeLive.Index do
  use HermusWeb, :live_view

  alias Hermus.Devices
  alias Hermus.Models

  @probes_limit 20

  @impl true
  def mount(_params, _session, socket) do
    if connected?(socket) do
      Phoenix.PubSub.subscribe(Hermus.PubSub, "probe")
    end

    {:ok, assign(socket, :devices, list_devices())}
  end

  @impl true
  def handle_info(%Models.Probe{} = probe, socket) do
    handle_new_probe(probe, socket)
  end

  defp handle_new_probe(probe, socket) do
    devices = socket.assigns.devices

    updated_devices =
      update_in(devices[probe.device_id], fn device ->
        probes =
          device.probes
          |> add(probe)
          |> window()

        %{device | probes: probes}
      end)

    socket = assign(socket, devices: updated_devices)

    {:noreply, push_event(socket, "probe", probe)}
  end

  defp add(probes, probe) do
    [probe | probes]
  end

  defp window(probes) do
    Enum.take(probes, @probes_limit)
  end

  defp list_devices() do
    @probes_limit
    |> Devices.list()
    |> Enum.into(%{}, fn device ->
      {device.id, device}
    end)
  end
end
                    
                    

You can view full code at: https://github.com/elpikel/hermus