Skip to content

Commit

Permalink
Refactor CrashGroupUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jul 11, 2023
1 parent b1cd21f commit 233b0d2
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 137 deletions.
106 changes: 24 additions & 82 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,11 @@ defmodule Membrane.Core.Parent.ChildLifeController do

# adding crash group to state
state =
if options.crash_group_mode != nil do
if options.crash_group_mode != nil or Map.has_key?(state.crash_groups, options.group) do
CrashGroupUtils.add_crash_group(
{options.group, options.crash_group_mode},
options.group,
options.crash_group_mode,
children_names,
children_pids,
state
)
else
Expand Down Expand Up @@ -520,7 +520,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do
LinkUtils.remove_link(child_name, pad_ref, state)
end

defp remove_children_from_specs(removed_children, state) do
@spec remove_children_from_specs(Child.name() | [Child.name()], Parent.state()) ::
Parent.state()
def remove_children_from_specs(removed_children, state) do
removed_children = Bunch.listify(removed_children) |> MapSet.new()

removed_children_specs =
Expand Down Expand Up @@ -654,45 +656,35 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

defp do_handle_child_death(child_name, :normal, state) do
{%{pid: child_pid}, state} = Bunch.Access.pop_in(state, [:children, child_name])
state = LinkUtils.unlink_element(child_name, state)
{_result, state} = remove_child_from_crash_group(state, child_pid)

%{group: group_name} = ChildrenModel.get_child_data!(state, child_name)

state =
with %{crash_group: %{^group_name => group}} <- state do
CrashGroupUtils.handle_crash_group_member_death(child_name, group, :normal, state)
end
|> ChildrenModel.delete_child(child_name)

{:ok, state}
end

defp do_handle_child_death(child_name, reason, state) do
%{pid: child_pid} = ChildrenModel.get_child_data!(state, child_name)
%{group: group} = ChildrenModel.get_child_data!(state, child_name)

with {:ok, group} <- CrashGroupUtils.get_group_by_member_pid(child_pid, state) do
state =
group.members
|> Enum.filter(&Map.has_key?(state.children, &1))
|> remove_children_from_specs(state)
case Map.get(state.crash_groups, group) do
%CrashGroup{} = group ->
state =
CrashGroupUtils.handle_crash_group_member_death(child_name, group, reason, state)
|> ChildrenModel.delete_child(child_name)

state =
crash_all_group_members(group, child_name, state)
|> remove_child_from_crash_group(group, child_pid)
|> case do
{:removed, state} ->
exec_handle_crash_group_down_callback(
group.name,
group.members,
group.crash_initiator || child_name,
state
)

{:not_removed, state} ->
state
end
|> Bunch.Access.delete_in([:children, child_name])
{:ok, state}

{:ok, state}
else
{:error, :not_member} when reason == {:shutdown, :membrane_crash_group_kill} ->
nil when reason == {:shutdown, :membrane_crash_group_kill} ->
raise Membrane.PipelineError,
"Child #{inspect(child_name)} that was not a member of any crash group killed with :membrane_crash_group_kill."

{:error, :not_member} ->
nil ->
Membrane.Logger.debug("""
Child #{inspect(child_name)} crashed but was not a member of any crash group.
Terminating.
Expand Down Expand Up @@ -730,54 +722,4 @@ defmodule Membrane.Core.Parent.ChildLifeController do
state = %{state | pending_specs: Map.merge(state.pending_specs, related_specs)}
related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2)
end

defp exec_handle_crash_group_down_callback(
group_name,
group_members,
crash_initiator,
state
) do
context_generator =
&Component.context_from_state(&1,
members: group_members,
crash_initiator: crash_initiator
)

CallbackHandler.exec_and_handle_callback(
:handle_crash_group_down,
Membrane.Core.Pipeline.ActionHandler,
%{context: context_generator},
[group_name],
state
)
end

# called when process was a member of a crash group
@spec crash_all_group_members(CrashGroup.t(), Child.name(), Parent.state()) ::
Parent.state()
defp crash_all_group_members(
%CrashGroup{triggered?: false} = crash_group,
crash_initiator,
state
) do
%CrashGroup{alive_members_pids: members_pids} = crash_group
state = LinkUtils.unlink_crash_group(crash_group, state)
Enum.each(members_pids, &Process.exit(&1, {:shutdown, :membrane_crash_group_kill}))
CrashGroupUtils.set_triggered(state, crash_group.name, crash_initiator)
end

defp crash_all_group_members(_crash_group, _crash_initiator, state), do: state

defp remove_child_from_crash_group(state, child_pid) do
with {:ok, group} <- CrashGroupUtils.get_group_by_member_pid(child_pid, state) do
remove_child_from_crash_group(state, group, child_pid)
else
{:error, :not_member} -> {:not_removed, state}
end
end

defp remove_child_from_crash_group(state, group, child_pid) do
CrashGroupUtils.remove_member_of_crash_group(state, group.name, child_pid)
|> CrashGroupUtils.remove_crash_group_if_empty(group.name)
end
end
141 changes: 86 additions & 55 deletions lib/membrane/core/parent/child_life_controller/crash_group_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,85 +3,116 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
# A module responsible for managing crash groups inside the state of pipeline.

alias Membrane.ChildrenSpec
alias Membrane.Core.Parent
alias Membrane.Core.Parent.CrashGroup
alias Membrane.Core.Pipeline
alias Membrane.Core.{CallbackHandler, Component, Parent, Pipeline}
alias Membrane.Core.Parent.{ChildLifeController, ChildrenModel, CrashGroup}
alias Membrane.Core.Parent.ChildLifeController.LinkUtils

@spec add_crash_group(
{Membrane.Child.group(), ChildrenSpec.crash_group_mode()},
[Membrane.Child.name()],
[pid()],
Pipeline.State.t()
) :: Pipeline.State.t()
def add_crash_group(group_spec, children_names, children_pids, state) do
{group_name, mode} = group_spec
def add_crash_group(group_name, _mode, children, state)
when is_map_key(state.crash_group, group_name) do
Bunch.Access.update_in(state, [:crash_groups, group_name, :members], &(children ++ &1))
end

Bunch.Access.update_in(state, [:crash_groups, group_name], fn
def add_crash_group(group_name, mode, children, state) do
Bunch.Access.put_in(
state,
[:crash_groups, group_name],
%CrashGroup{
members: current_children_names,
alive_members_pids: current_alive_members
} = group ->
%CrashGroup{
group
| members: current_children_names ++ children_names,
alive_members_pids: current_alive_members ++ children_pids
}

nil ->
%CrashGroup{
name: group_name,
mode: mode,
members: children_names,
alive_members_pids: children_pids
}
end)
name: group_name,
mode: mode,
members: children
}
)
end

@spec remove_crash_group_if_empty(Pipeline.State.t(), CrashGroup.name()) ::
{:removed | :not_removed, Pipeline.State.t()}
def remove_crash_group_if_empty(state, group_name) do
%CrashGroup{alive_members_pids: alive_members_pids} = state.crash_groups[group_name]
@spec handle_crash_group_member_death(Child.name(), CrashGroup.t(), any(), Parent.state()) ::
Parent.state()
def handle_crash_group_member_death(child_name, crash_group_data, reason, state)

if alive_members_pids == [] do
state = Bunch.Access.delete_in(state, [:crash_groups, group_name])
def handle_crash_group_member_death(
child_name,
%CrashGroup{triggered?: true} = group,
_reason,
state
) do
all_members_dead? =
Enum.all?(group.members, fn member ->
member == child_name or not Map.has_key?(state.children, member)
end)

{:removed, state}
if all_members_dead? do
exec_handle_crash_group_down(group, state)
else
{:not_removed, state}
state
end
end

@spec remove_member_of_crash_group(Pipeline.State.t(), CrashGroup.name(), pid()) ::
Pipeline.State.t()
def remove_member_of_crash_group(state, group_name, pid) do
def handle_crash_group_member_death(
child_name,
%CrashGroup{members: [child_name]} = group,
:normal,
state
) do
Bunch.Access.delete_in(state, [:crash_groups, group.name])
end

def handle_crash_group_member_death(
child_name,
%CrashGroup{members: [child_name]} = group,
_reason,
state
) do
state = ChildLifeController.remove_children_from_specs(group.members, state)
state = LinkUtils.unlink_crash_group(group, state)
exec_handle_crash_group_down(group, state)
end

def handle_crash_group_member_death(child_name, %CrashGroup{} = group, :normal, state) do
Bunch.Access.update_in(
state,
[:crash_groups, group_name, :alive_members_pids],
&List.delete(&1, pid)
[:crash_groups, group.name, :members],
&List.delete(&1, child_name)
)
end

@spec get_group_by_member_pid(pid(), Parent.state()) ::
{:ok, CrashGroup.t()} | {:error, :not_member}
def get_group_by_member_pid(member_pid, state) do
crash_group =
state.crash_groups
|> Map.values()
|> Enum.find(fn %CrashGroup{alive_members_pids: alive_members_pids} ->
member_pid in alive_members_pids
end)
def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _reason, state) do
state = ChildLifeController.remove_children_from_specs(group.members, state)
state = LinkUtils.unlink_crash_group(group, state)

case crash_group do
%CrashGroup{} -> {:ok, crash_group}
nil -> {:error, :not_member}
end
Enum.each(group.members, fn child ->
ChildrenModel.get_child_data!(state, child)
|> Map.get(:pid)
|> Process.exit({:shutdown, :membrane_crash_group_kill})
end)

Bunch.Access.put_in(state, [:crash_groups, group.name], %CrashGroup{
group
| triggered?: true,
crash_initiator: child_name
})
end

@spec set_triggered(Pipeline.State.t(), CrashGroup.name(), Membrane.Child.name()) ::
Pipeline.State.t()
def set_triggered(state, group_name, crash_initiator) do
Bunch.Access.update_in(state, [:crash_groups, group_name], fn group ->
%{group | triggered?: true, crash_initiator: crash_initiator}
end)
defp exec_handle_crash_group_down(
crash_group,
state
) do
context_generator =
&Component.context_from_state(&1,
members: crash_group.members,
crash_initiator: crash_group.crash_initiator
)

CallbackHandler.exec_and_handle_callback(
:handle_crash_group_down,
Membrane.Core.Pipeline.ActionHandler,
%{context: context_generator},
[crash_group.name],
state
)
end
end
5 changes: 5 additions & 0 deletions lib/membrane/core/parent/children_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ defmodule Membrane.Core.Parent.ChildrenModel do
%{state | children: children}
end

@spec delete_child(Parent.state(), Child.name()) :: Parent.state()
def delete_child(%{children: children} = state, child) do
%{state | children: Map.delete(children, child)}
end

@spec all?(Parent.state(), (ChildEntry.t() -> as_boolean(term))) :: boolean()
def all?(state, predicate) do
state.children
Expand Down

0 comments on commit 233b0d2

Please sign in to comment.