Simple Ecto query pipelines

The Ecto library is the means used by Elixir developers to interact with a relational database. One of the first things you might want to do when using Ecto is to query a database. This is a simple example showing how you can use a pipeline and a list of options to provide fairly robust querying capabilities for a single table.

Example Table

Assume we have a “contacts” table in a Postgres database with an auto-generated id primary key.

   Column    |              Type
-------------+--------------------------------
 id          | bigint                         |
 first_name  | character varying(255)         |
 last_name   | character varying(255)         |
 inserted_at | timestamp(0) without time zone |
 updated_at  | timestamp(0) without time zone |
Indexes:
    "contacts_pkey" PRIMARY KEY, btree (id)

Elixir Ecto Schema

The schema file in Elixir looks like this:

defmodule Ectyl.Database.Contact do
  use Ecto.Schema
  import Ecto.Changeset

  schema "contacts" do
    field :first_name, :string
    field :last_name, :string

    timestamps(type: :utc_datetime)
  end

  @doc false
  def changeset(contact, attrs) do
    contact
    |> cast(attrs, [:first_name, :last_name])
    |> validate_required([:first_name, :last_name])
  end
end

Requirements

We want to write a module that lets developers:

  • get all the contacts
  • query on first name, last name or both to return only matching rows
  • sort by first name, last name or both
  • paginate

Building a Pipelined Function

The way to do this is use a pipeline. Each step in the pipeline builds an Ecto.Query. We’ll allow the caller to pass opts in to a function but provide a default that gives no opts.

defmodule Ectyl.Contacts do
  @moduledoc """
  The Contacts context.
  """

  import Ecto.Query, warn: false
  alias Ectyl.Database.Repo

  alias Ectyl.Database.Contact

  @contacts_fields Contact.__schema__(:fields)
  @directions [:asc, :desc]
  @operators [:like, :in, :not_in, :begins_with, :ends_with]

  @doc """
  Returns the list of contacts.

  ## Examples

      iex> Ectyl.Contacts.list_contacts()
      [%Contact{}, ...]

  ## Options
  - `:filters` - A list of filters to apply to the query. Each filter can be a tuple of `{field, value}` or `{field, value, operator}`.
    Supported operators are `:like`, `:in`, `:not_in`, `:begins_with`, and `:ends_with`. Supported fields are those defined in `Contact.__schema__(:fields)`
  - `:sort` - A list of tuples for sorting, where each tuple is `{field, direction}`. Supported fields are those defined in `Contact.__schema__(:fields)` and directions are `:asc` or `:desc`.
  - `:page` - The page number for pagination (default is 1).
  - `:page_size` - The number of contacts per page (default is 10).
  """
  def list_contacts(opts \\ []) do
    contacts_base_query()
    |> apply_filters(opts)
    |> apply_sorting(opts)
    |> apply_pagination(opts)
    |> Repo.all()
  end

Building a Base Ecto.Query

The first step in the pipeline is to get the “base” Ecto.Query. This is the datatype that is passed to each function in the pipeline.

  defp contacts_base_query do
    from(c in Contact)
  end

Applying Filters

The Ecto.Query built by that function is passed into the apply_filters/2 function along with the opts provided by the caller:

  defp apply_filters(query, opts) do
    opts
    |> options(:filters)
    |> Enum.filter(fn
      {field, _value} when field in @contacts_fields -> true
      {field, _value, operator} when field in @contacts_fields and operator in @operators -> true
      _ -> false
    end)
    |> Enum.reduce(query, fn
      {field, value}, acc ->
        from c in acc, where: field(c, ^field) == ^value

      {field, value, :begins_with}, acc ->
        from c in acc, where: like(field(c, ^field), ^"#{value}%")

      {field, value, :ends_with}, acc ->
        from c in acc, where: like(field(c, ^field), ^"%#{value}")

      {field, value, :like}, acc ->
        from c in acc, where: like(field(c, ^field), ^"%#{value}%")

      {field, value, :in}, acc ->
        from c in acc, where: field(c, ^field) in ^value

      {field, value, :not_in}, acc ->
        from c in acc, where: field(c, ^field) not in ^value

      _, acc ->
        acc
    end)
  end

The apply_filters/2 function extracts the :filters from the opts and defaults to an empty list. It then filters the list to ensure that the parameters are in the correct format. The check ensures that only fields that are in the schema are used. If an operator is used it must be one of the supported options. If an element in the list does not meet the criteria it is discarded. The function then uses an Enum.reduce/4 where the Ecto.Query is used as an accumulator. Any alteration of the criteria builds a new Ecto.Query which becomes the updated accumulator.

Applying Sorting

The apply_sorting/2 function is similar.

  defp apply_sorting(query, opts) do
    opts
    |> options(:sort)
    |> Enum.filter(fn {field, direction} -> field in @contacts_fields and direction in @directions end)
    |> Enum.reduce(query, fn {field, direction}, acc ->
      from c in acc, order_by: [{^direction, field(c, ^field)}]
    end)
  end

The function extracts the sort options and defaults to an empty list. It then filters the list to ensure parameters are in the right format. Again, the Ecto.Query is used as the accumulator and sorts added build an updated Ecto.Query accumulator.

Apply Pagination

The final function in the pipeline is apply_pagination. In this case the function will use pagination regardless of whether or not the caller specified :page or :page_size.

  defp apply_pagination(query, opts) do
    page = opts[:page] || 1
    page_size = opts[:page_size] || 10

    query
    |> offset(^((page - 1) * page_size))
    |> limit(^page_size)
  end

Execute Query

The final step is to pass the Ecto.Query to Repo.all to return results. In this case the list contains [%Contact{}, ...].

Problems with this Approach

This is a trivial example. Although it is providing fairly robust capabilities (filters, sorting, pagination) it falls apart if we want to join to other tables (for example, lets assume there’s a phone_numbers table that has a foreign key to contacts to allow a contact to have any number of phone numbers). The reason this probably does not work is that we might want to filter on the phone number (for example, give me all the contacts in area code 678) but there’s no means of indicating what table a filter applies to. There’s a similar situation with sorting. I’ll create another post to indicate how that might be handled.

There is some checking of the parameters provided but there is no notification (logging, telemetry) to indicate that the function received bad input. In most cases you would want to have that. I think it’s generally fine to have the code proceed if the input data can be cleaned up but I’d want to know that some part of the codebase is passing input data that will never work.

Benefits of this Approach

Pipelining to build an Ecto.Query is a powerful tool that lets you write modules that provide filtering, sorting and pagination in a way that is fairly powerful while being easy to read and maintain.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • Recent Books
  • Atkin White Rice
  • Elixir - Iex History
  • Elixir Dependencies
  • Writing Confusing Code in Elixir