logo

cmdarek

05-03-2022

Wind alert application

Last year I started learning kite surfing. In the area where I live we have rather fussy weather. Which means that wind does not not blow with proper speed and direction to fully enjoy this sport every day. To not miss any day that I can practise I prepared small application which fetches wind forecasts from open meteo and sends email notification daily.

Main entry point to application is web form where user can register for wind forecast for place that interests him. This is handled by SubscriptionsController which uses Requests.Subscription to to validate user input and only if it is valid it calls Subscriptions module to register user and subscribe to forecast for a given place.

                        
defmodule OrkanWeb.SubscriptionsController do
  use OrkanWeb, :controller

  alias Orkan.Subscriptions
  alias OrkanWeb.Requests.Subscription

  def index(conn, _params) do
    render(conn, "index.html", changeset: Subscription.changeset(%Subscription{}, %{}))
  end

  def create(conn, %{"subscription" => subscription_params}) do
    changeset = Subscription.changeset(%Subscription{}, subscription_params)

    if changeset.valid? do
      case Subscriptions.create(changeset.changes) do
        {:ok, _} ->
          conn
          |> put_flash(:info, "Succesfully subscribed.")
          |> redirect(to: Routes.subscriptions_path(conn, :index))

        {:error, error} ->
          conn
          |> put_flash(:error, error)
          |> render("index.html", changeset: changeset)
      end
    else
      render(conn, "index.html", changeset: %{changeset | action: :insert})
    end
  end
end
                        
                    
                        
defmodule OrkanWeb.Requests.Subscription do
  use Ecto.Schema
  import Ecto.Changeset
  require Decimal

  @fields [:email, :longitude, :latitude, :name]
  @mail_regex ~r/^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}$/

  @primary_key false
  embedded_schema do
    field :email, :string
    field :longitude, :string
    field :latitude, :string
    field :name, :string
  end

  def changeset(option, attrs) do
    option
    |> cast(attrs, @fields)
    |> validate_required(@fields)
    |> validate_format(:email, @mail_regex)
    |> validate_coordinates([:longitude, :latitude])
  end

  defp validate_coordinates(changeset, fields) do
    Enum.reduce(fields, changeset, fn field, changeset ->
      field_value = get_change(changeset, field)

      if field_value != nil && !valid_coordinate?(field_value) do
        add_error(changeset, field, "not valid coordinate")
      else
        changeset
      end
    end)
  end

  defp valid_coordinate?(coordinate) do
    case Decimal.parse(coordinate) do
      :error ->
        false

      {coordinate, _} ->
        if Decimal.compare(coordinate, 0) == :lt do
          false
        else
          true
        end
    end
  end
end
                        
                    

In the background of our application we have weather and email workers scheduled with quantum library:

                        
config :orkan, Orkan.Scheduler,
  jobs: [
    {"0 * * * *", {Orkan.Forecasts.Worker, :update_forecasts, []}},
    {"0 7 * * *", {Orkan.Notifications.Worker, :send_forecasts, []}}
  ]
                        
                    

As you can see Weather worker is scheduled to do the work every hour. It fetches data from open meteo and saves it to db. That way we are building our history of weather forecast which we could use in the future.

Notification worker is scheduled to send email notification to all subscribed users every day at 7 in the morning. It gets forecast for each places from db and uses Bambo library to generate and send email to all users subscribed for notifications for given place.

Notifications

This module is responsible for sending emails about wheather conditions. It uses Subscription module to get subscriptions and palces and Mail module to compose and send email. In production environment you should get and send data in chunks.

                        
defmodule Orkan.Notifications do
  alias Orkan.Subscriptions
  alias Orkan.Forecasts
  alias Orkan.Notifications.Mail

  def send_forecasts() do
    Enum.each(Subscriptions.users(), fn user ->
      places = Subscriptions.places(user.id)
      forecasts = Forecasts.get(places)

      Mail.send(user, forecasts)
    end)
  end
end
                        
                    

Subscriptions

This module is responsible for creating subscriptions and exposes information about places and users. What is interesting in this code is that we use on_conflict and conflict_target properties when we insert user and place. Reason for that is concurrent request for the same place by different users or for different places for the same user.
                    
defmodule Orkan.Subscriptions do
  import Ecto.Query

  alias Orkan.Repo
  alias Orkan.Subscriptions.Place
  alias Orkan.Subscriptions.Subscription
  alias Orkan.Subscriptions.User

  def places() do
    Repo.all(Place)
  end

  def places(user_id) do
    Subscription
    |> from(as: :subscription)
    |> join(:inner, [subscription: s], assoc(s, :place), as: :place)
    |> where([subscription: s], s.user_id == ^user_id)
    |> select([place: p], p)
    |> Repo.all()
  end

  def users() do
    Repo.all(User)
  end

  def get(user_id) do
    Repo.all(from s in Subscription, where: s.user_id == ^user_id)
  end

  def create(%{email: email, longitude: longitude, latitude: latitude, name: name}) do
    user = get_or_create_user(email)
    place = get_or_create_place(longitude, latitude, name)

    case Repo.insert(
           Subscription.changeset(%Subscription{}, %{
             place_id: place.id,
             user_id: user.id
           })
         ) do
      {:ok, subscription} ->
        {:ok, subscription}

      {:error, _changeset} ->
        {:error, "Already subscribed."}
    end
  end

  defp get_or_create_user(email) do
    {:ok, user} =
      %User{}
      |> User.changeset(%{email: email})
      |> Repo.insert(on_conflict: :nothing)

    user
  end

  def get_or_create_place(longitude, latitude, name) do
    {:ok, place} =
      %Place{}
      |> Place.changeset(%{longitude: longitude, latitude: latitude, name: name})
      |> Repo.insert(
        on_conflict: :nothing,
        conflict_target: [:longitude, :latitude]
      )

    place
  end
end
                    
                    

Forecasts

Main responsibility of this module is to populate forecast data. In order to to that we get data from open meteo and save it to db. Additionally this module exposes forecasts for a given place.

                    
defmodule Orkan.Forecasts do
  import Ecto.Query

  alias Orkan.Forecasts.Forecast
  alias Orkan.OpenMeteo.Client
  alias Orkan.Repo

  def get(places) do
    today = DateTime.new!(Date.utc_today(), ~T[00:00:00.000], "Etc/UTC")
    tomorrow = add_days(today, 1)
    in_two_days = add_days(tomorrow, 2)
    places_id = Enum.map(places, fn place -> place.id end)

    Forecast
    |> from(as: :forecast)
    |> where([forecast: f], f.datetime >= ^tomorrow and f.datetime < ^in_two_days)
    |> where([forecast: f], f.place_id in ^places_id)
    |> order_by([forecast: f], [f.place_id, f.datetime])
    |> select([forecast: f], {f.place_id, f.datetime, f.wind_speed, f.wind_direction})
    |> Repo.all()
    |> Enum.group_by(fn {place_id, _, _, _} -> place_id end)
    |> Enum.map(fn {place_id, forecasts} ->
      place = Enum.find(places, fn place -> place.id == place_id end)

      %{
        place: place.name,
        place_forecasts:
          Enum.map(forecasts, fn {_, datetime, wind_speed, wind_direction} ->
            %{
              datetime: datetime,
              wind_speed: wind_speed,
              wind_direction: wind_direction
            }
          end)
      }
    end)
  end

  def update_forecasts(places) do
    places
    |> Enum.map(fn place -> update_forecast(place) end)
    |> List.flatten()
  end

  defp update_forecast(place) do
    forecasts = Client.get_data(place.longitude, place.latitude)

    forecasts["hourly"]["time"]
    |> Enum.with_index()
    |> Enum.map(fn {datetime, index} ->
      datetime = format(datetime)

      forecast =
        Forecast
        |> where([f], f.place_id == ^place.id and f.datetime == ^datetime)
        |> Repo.one()

      wind_speed = to_float(Enum.at(forecasts["hourly"]["windspeed_10m"], index))
      wind_direction = Enum.at(forecasts["hourly"]["winddirection_10m"], index)

      case forecast do
        nil ->
          Repo.insert!(%Forecast{
            place_id: place.id,
            wind_direction: wind_direction,
            wind_speed: wind_speed,
            datetime: datetime
          })

        _ ->
          forecast
          |> Forecast.changeset(%{wind_speed: wind_speed, wind_direction: wind_direction})
          |> Repo.update!()
      end
    end)
  end

  defp format(datetime) do
    {:ok, datetime, 0} = DateTime.from_iso8601(datetime <> ":00Z")
    datetime
  end

  defp to_float(value) when is_integer(value) do
    {converted, _} = Float.parse("#{value}")
    converted
  end

  defp to_float(value) do
    value
  end

  defp add_days(datetime, days) do
    DateTime.add(datetime, days * 24 * 60 * 60, :second)
  end
end
                    
                    
You can view full code at: https://github.com/elpikel/orkan