Skip to content

Commit

Permalink
Mise en place architecture de jobs (Oban) sur 2 noeuds isolés (#1875)
Browse files Browse the repository at this point in the history
* add oban package

Co-authored-by: Thibaut Barrère <[email protected]>

* oban migrations

* create an oban worker for gtfs to geojson conversion

* keep import_config at the bottom of the file

* read env variable for worker in config.exs

* pick the router to use depending on the WORKER env variable

* add runtime.exs file

* remove compile time config

* revert code

* add a plug to skip routing in worker mode

* only the worker launches the scheduled jobs

* rename plug

* no oban queus for testing

* set runtime config for prod and test env

* coquille

* remove extra comma

* start Oban in application.ex

* add a little log

* create a simple test page for oban

* only workers have a queue

* set queues to false to disable a worker

* Make sure the job never retries for now

* Start centralizing a bit of config logic

* Make it easier to understand what happens directly from the pipeline definition

This also leverages shared helpers to determine the situation.

* Move a part of dev config to new runtime.exs

* Reformat configuration automatically

* Only start npm watcher in webserver mode

* Make it easy to run 2 nodes locally by supporting PORT override

* Add a bit of logging to clarify the boot process

* Shorten the line

* Revert incorrect change

* Revert change for config formatter to reduce PR noise

* Remove oban demo controller

* Add hello world for jobs dashboard

* Add more todos

* Improve runtime configuration

The new layout allows to:
- DRY most of the config steps
- easily follow the Oban guidelines for testing (https://github.com/sorentwo/oban#testing)
- ensure the test suite starts running again
- disable Oban processing when under IEx

* Leave port 5001 for second dev-mode node instead

When working in local multi-node, it is natural to sometimes start 2 nodes, with ports 5000 (default one) and 5001 (the next one). In that case, the default test port would conflict, so I bumped it up a bit.

* Add explanation

* Add explanation

* Delete index.html.eex

* Delete oban_view.ex

* Remove configuration overriden by runtime.exs

* Bring back missing bits from refactoring

* Revert "Bring back missing bits from refactoring"

This reverts commit 6ba2238.

* Bring back missing piece

* use Finch Wrapper instead

* create a behavior for rambo

* setup the Mox Mock

* It's ok to have a running Oban queue in iex in dev mode

* first oban job test

* change Mock name

* enrichissement de la page d'observation des jobs

* i'm drying my best

* properly implement behavior

* update converter job test

* show errors for discarded jobs

* add logging in case of job exception

* remove IO.inspect

* change default queue concurrency to 2

* fix credo warnings

* Fix warnings

* Lower production pool size from 10 to 6 to avoid crashes

* geojson converter directly writes result in file

* update test

* switch config worker and webserver variables to booleans

* do not launch quantum jobs in an iEx session

* change env variables to booleans

* Update apps/transport/lib/jobs/RamboLauncher.ex

Co-authored-by: Antoine Augusti <[email protected]>

* Update apps/transport/lib/jobs/RamboLauncher.ex

Co-authored-by: Antoine Augusti <[email protected]>

* Update apps/transport/lib/jobs/geojson_converter_job.ex

Co-authored-by: Antoine Augusti <[email protected]>

* Update apps/transport/lib/jobs/oban_logger.ex

Co-authored-by: Antoine Augusti <[email protected]>

Co-authored-by: Francis <[email protected]>
Co-authored-by: Antoine Augusti <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2021
1 parent a1e2d38 commit f2a5b5d
Show file tree
Hide file tree
Showing 24 changed files with 491 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[
inputs: ["mix.exs", "apps/{datagouvfr,db,gbfs,helpers,transport}/{lib,test}/**/*.{ex,exs}"],
inputs: ["mix.exs", "config/runtime.exs", "apps/{datagouvfr,db,gbfs,helpers,transport}/{lib,test}/**/*.{ex,exs}"],
line_length: 120
]
3 changes: 2 additions & 1 deletion apps/db/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ defmodule Db.MixProject do
{:shared, in_umbrella: true},
{:sentry, ">= 0.0.0"},
{:typed_ecto_schema, ">= 0.1.1"},
{:ex_machina, "~> 2.4", only: :test}
{:ex_machina, "~> 2.4", only: :test},
{:oban, "~> 2.9"}
]
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule DB.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration

def up do
# TO DO check Oban migration policy
Oban.Migrations.up()
end

# We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
# necessary, regardless of which version we've migrated `up` to.
def down do
Oban.Migrations.down(version: 1)
end
end
29 changes: 29 additions & 0 deletions apps/transport/lib/jobs/RamboLauncher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Transport.RamboLauncher do
@moduledoc """
A behavior for Rambo, with dynamic dispatching
"""
@callback run(binary(), [binary()]) :: {:ok, binary()} | {:error, any()}

def impl, do: Application.get_env(:transport, :rambo_impl)

def run(binary_path, options), do: impl().run(binary_path, options)
end

defmodule Transport.Rambo do
@moduledoc """
Run an executable with Rambo
"""
@behaviour Transport.RamboLauncher

@impl Transport.RamboLauncher
def run(binary_path, options) do
# TO DO: make sure to have a command that we can run on any dev machine (with docker)
# TO DO: make sure to "clear" the ENV before calling a binary
# TO DO: make sure to "change working directory" to a specific working place
case Rambo.run(binary_path, options) do
{:ok, %Rambo{out: res}} -> {:ok, res}
{:error, %Rambo{err: err_msg}} -> {:error, err_msg}
{:error, _} = r -> r
end
end
end
50 changes: 50 additions & 0 deletions apps/transport/lib/jobs/geojson_converter_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Transport.GeojsonConverterJob do
@moduledoc """
Job converting a GTFS file to GeoJSON
"""
use Oban.Worker, max_attempts: 1
import Logger
alias DB.{Repo, Resource}

# TO DO: handle the case where a resource cannot be found

@impl true
def perform(%{id: id, args: %{"resource_id" => resource_id}}) do
Logger.info("Job #{id} started by #{__MODULE__}")

url = Resource |> Repo.get!(resource_id) |> Map.fetch!(:url)

# TO DO how is the tmp folder cleaned (upon completion or after a crash)?
gtfs_file_path = System.tmp_dir!() |> Path.join("#{id}_download")

# TO DO stream file to disk
# TO DO verify headers (content-type) and maybe provide alerts to providers!
%{status: 200, body: body} = Unlock.HTTP.Client.impl().get!(url, [])
File.write!(gtfs_file_path, body)

geojson_file_path =
System.tmp_dir!()
|> Path.join("#{id}_output.geojson")

:ok = Transport.GtfsToGeojsonConverter.convert(gtfs_file_path, geojson_file_path)

Logger.info("Job #{id} success, saving result: #{geojson_file_path}")

:ok
end
end

defmodule Transport.GtfsToGeojsonConverter do
@moduledoc """
Given a GTFS file path, create from the file the corresponding geojson with the stops and line shapes if available.
"""
@spec convert(binary(), binary()) :: :ok | {:error, any()}
def convert(gtfs_file_path, geojson_file_path) do
binary_path = Path.join(Application.fetch_env!(:transport, :transport_tools_folder), "gtfs-geojson")

case Transport.RamboLauncher.run(binary_path, ["--input", gtfs_file_path, "--output", geojson_file_path]) do
{:ok, _} -> :ok
{:error, e} -> {:error, e}
end
end
end
19 changes: 19 additions & 0 deletions apps/transport/lib/jobs/oban_logger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Transport.ObanLogger do
@moduledoc """
Logs the Oban job exceptions as warnings
"""
require Logger

def handle_event(
[:oban, :job, :exception],
%{duration: duration} = info,
%{args: args, error: error, id: id, worker: worker},
nil
) do
Logger.warn(
"Job #{id} handled by #{worker} called with args #{inspect(args)} failed in #{duration}. Error: #{inspect(error)}"
)
end

def setup, do: :telemetry.attach("oban-logger", [:oban, :job, :exception], &handle_event/4, nil)
end
23 changes: 22 additions & 1 deletion apps/transport/lib/transport/application.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Transport.Application do
require Logger

@moduledoc """
See https://hexdocs.pm/elixir/Application.html
for more information on OTP Applications
Expand All @@ -15,23 +17,42 @@ defmodule Transport.Application do
def cache_name, do: @cache_name

def start(_type, _args) do
unless Mix.env() == :test do
cond do
worker_only?() -> Logger.info("Booting in worker-only mode...")
webserver_only?() -> Logger.info("Booting in webserver-only mode...")
dual_mode?() -> Logger.info("Booting in worker+webserver mode...")
end
end

children =
[
{Cachex, name: @cache_name},
supervisor(TransportWeb.Endpoint, []),
supervisor(ImportDataWorker, []),
CSVDocuments,
SearchCommunes,
{Phoenix.PubSub, [name: TransportWeb.PubSub, adapter: Phoenix.PubSub.PG2]}
{Phoenix.PubSub, [name: TransportWeb.PubSub, adapter: Phoenix.PubSub.PG2]},
# Oban is "always started", but muted via `config/runtime.exs` for cases like
# tests, IEx usage, front-end only mode etc.
{Oban, Application.fetch_env!(:transport, Oban)}
]
|> add_scheduler()
## manually add a children supervisor that is not scheduled
|> Kernel.++([{Task.Supervisor, name: ImportTaskSupervisor}])

:ok = Transport.ObanLogger.setup()

opts = [strategy: :one_for_one, name: Transport.Supervisor]
Supervisor.start_link(children, opts)
end

def webserver_enabled?, do: Application.fetch_env!(:transport, :webserver)
def worker_enabled?, do: Application.fetch_env!(:transport, :worker)
def worker_only?, do: worker_enabled?() && !webserver_enabled?()
def webserver_only?, do: webserver_enabled?() && !worker_enabled?()
def dual_mode?, do: worker_enabled?() && webserver_enabled?()

defp add_scheduler(children) do
if Mix.env() != :test do
import Supervisor.Spec, only: [worker: 2]
Expand Down
31 changes: 31 additions & 0 deletions apps/transport/lib/transport/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,35 @@ defmodule Transport.Scheduler do

use Quantum,
otp_app: :transport

@doc """
The jobs are defined here, but only programmatically activated on one node. See `config/runtime.exs`.
"""
def scheduled_jobs do
[
# Every day at 4am UTC
{"0 4 * * *", {Transport.ImportData, :import_validate_all, []}},
# Send email for outdated data
{"@daily", {Transport.DataChecker, :outdated_data, []}},
# Set inactive data
{"@daily", {Transport.DataChecker, :inactive_data, []}},
# Watch for new comments on datasets
{"@daily", {Transport.CommentsChecker, :check_for_new_comments, []}},
# Delete orphan community resources
{"@daily", {Transport.CommunityResourcesCleaner, :clean_community_resources, []}},
# backup all resources
{"@daily", {Transport.History.Backup, :backup_resources, []}},
# clean old logs
{"0 3 * * *", {Transport.LogCleaner, :clean_old_logs, []}},
# clean old validations
{"0 2 * * *", {Transport.ValidationCleaner, :clean_old_validations, []}},
# compute some global stats and store them in the DB
{"0 20 * * *", {Transport.StatsHandler, :store_stats, []}},
# generate NeTEx / geojson files for all GTFS.
# Note : this should be run before the import_validate_all for the NeTEx / geojson
# to be created when the import is run
{"0 1 * * *", {Transport.GtfsConversions, :convert_all, []}},
{"0 * * * *", {Transport.ImportData, :refresh_places, []}}
]
end
end
93 changes: 93 additions & 0 deletions apps/transport/lib/transport_web/live/backoffice/jobs_live.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
defmodule TransportWeb.Backoffice.JobsLive do
@moduledoc """
A quick dashboard for jobs.
"""
use Phoenix.LiveView
import Ecto.Query

# Authentication is assumed to happen in regular HTTP land. Here we verify
# the user presence + belonging to admin team, or redirect immediately.
def mount(_params, session, socket) do
%{
"current_user" => current_user
} = session

{:ok,
ensure_admin_auth_or_redirect(socket, current_user, fn socket ->
if connected?(socket), do: schedule_next_update_data()

socket
|> update_data()
end)}
end

# TO DO: DRY code with proxy live
# If one calls "redirect" and does not leave immediately, the remaining code will
# be executed, opening security issues. This method goal is to minimize this risk.
# See https://hexdocs.pm/phoenix_live_view/security-model.html for overall docs.
#
# Also, disconnect will have to be handled:
# https://hexdocs.pm/phoenix_live_view/security-model.html#disconnecting-all-instances-of-a-given-live-user
#
defp ensure_admin_auth_or_redirect(socket, current_user, func) do
if current_user && TransportWeb.Router.is_transport_data_gouv_member?(current_user) do
# We track down the current admin so that it can be used by next actions
socket = assign(socket, current_admin_user: current_user)
# Then call the remaining code, which is expected to return the socket
func.(socket)
else
redirect(socket, to: "/login")
end
end

defp schedule_next_update_data do
Process.send_after(self(), :update_data, 1000)
end

def last_jobs_query(state, n) do
from(j in "oban_jobs",
select: map(j, [:id, :state, :queue, :args, :inserted_at, :errors]),
order_by: [desc: j.id],
where: j.state == ^state,
limit: ^n
)
end

def count_jobs_query(state) do
from(j in "oban_jobs",
select: count(),
where: j.state == ^state
)
end

def oban_query(query), do: Oban.config() |> Oban.Repo.all(query)

def last_jobs(state, n), do: state |> last_jobs_query(n) |> oban_query

def count_jobs(state), do: state |> count_jobs_query |> oban_query |> Enum.at(0)

defp update_data(socket) do
assign(socket,
last_updated_at: (Time.utc_now() |> Time.truncate(:second) |> to_string()) <> " UTC",
executing_jobs: last_jobs("executing", 5),
count_executing_jobs: count_jobs("executing"),
last_completed_jobs: last_jobs("completed", 5),
count_completed_jobs: count_jobs("completed"),
available_jobs: last_jobs("available", 5),
count_available_jobs: count_jobs("available"),
last_discarded_jobs: last_jobs("discarded", 5),
count_discarded_jobs: count_jobs("discarded")
)
end

def handle_info(:update_data, socket) do
schedule_next_update_data()
{:noreply, update_data(socket)}
end

def build_session(conn) do
%{
"current_user" => conn.assigns[:current_user]
}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<section class="container pt-48 pb-48">
<h1>Jobs Observation Center</h1>
<p class="small">Dernière mise à jour: <%= @last_updated_at %></p>

<h2>Executing jobs</h2>
<%= live_component JobsTableComponent, jobs: @executing_jobs, state: "executing" %>
<p class="small"> Total: <%= @count_executing_jobs %></p>

<h2>Completed jobs</h2>
<%= live_component JobsTableComponent, jobs: @last_completed_jobs, state: "completed" %>
<p class="small"> Total: <%= @count_completed_jobs %></p>

<h2>Available jobs</h2>
<%= live_component JobsTableComponent, jobs: @available_jobs, state: "available" %>
<p class="small"> Total: <%= @count_available_jobs %></p>

<h2>Discarded jobs</h2>
<%= live_component JobsTableComponent, jobs: @last_discarded_jobs, state: "discarded" %>
<p class="small"> Total: <%= @count_discarded_jobs %></p>
</section>
<script defer type="text/javascript" src="/js/app.js") %>></script>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule JobsTableComponent do
@moduledoc """
A live view table for Oban jobs monitoring
"""
use Phoenix.LiveComponent

def render(assigns) do
~L"""
<table class="table">
<thead>
<tr>
<th>id</th>
<th>state</th>
<th>queue</th>
<th>args</th>
<th>inserted_at</th>
<%= if @state == "discarded" do %>
<th>errors</th>
<% end %>
</tr>
</thead>
<tbody>
<%= for job <- @jobs do %>
<tr>
<td><%= job.id %></td>
<td><%= job.state %></td>
<td><%= job.queue %></td>
<td><%= inspect(job.args) %></td>
<td><%= job.inserted_at %></td>
<%= if @state == "discarded" do %>
<td><%= inspect(job.errors) %></td>
<% end %>
</tr>
<% end %>
</tbody>
</table>
"""
end
end
23 changes: 23 additions & 0 deletions apps/transport/lib/transport_web/plugs/halt.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule TransportWeb.Plugs.Halt do
@moduledoc """
When the app runs on worker-only (Oban) mode, we still need a HTTP endpoint so that the hosting
provider monitoring can verify the app is up. In that case, though, we want to avoid serving the
routes we normally serve, as early as possible ; such is the purpose of this plug.
"""
import Plug.Conn

def init(options), do: options

def call(conn, opts) do
{mod, fun} = opts[:if]

if apply(mod, fun, []) do
conn
|> put_resp_content_type("text/plain")
|> send_resp(200, opts[:message])
|> halt()
else
conn
end
end
end
Loading

0 comments on commit f2a5b5d

Please sign in to comment.