From 233b0d2fbbc1b38a8fc4961e8e133bd6921960d3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Jul 2023 17:29:42 +0200 Subject: [PATCH] Refactor CrashGroupUtils --- .../core/parent/child_life_controller.ex | 106 +++---------- .../crash_group_utils.ex | 141 +++++++++++------- lib/membrane/core/parent/children_model.ex | 5 + 3 files changed, 115 insertions(+), 137 deletions(-) diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 3648caea9..586d4e494 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -270,11 +270,11 @@ defmodule Membrane.Core.Parent.ChildLifeController do # adding crash group to state state = - if options.crash_group_mode != nil do + if options.crash_group_mode != nil or Map.has_key?(state.crash_groups, options.group) do CrashGroupUtils.add_crash_group( - {options.group, options.crash_group_mode}, + options.group, + options.crash_group_mode, children_names, - children_pids, state ) else @@ -520,7 +520,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do LinkUtils.remove_link(child_name, pad_ref, state) end - defp remove_children_from_specs(removed_children, state) do + @spec remove_children_from_specs(Child.name() | [Child.name()], Parent.state()) :: + Parent.state() + def remove_children_from_specs(removed_children, state) do removed_children = Bunch.listify(removed_children) |> MapSet.new() removed_children_specs = @@ -654,45 +656,35 @@ defmodule Membrane.Core.Parent.ChildLifeController do end defp do_handle_child_death(child_name, :normal, state) do - {%{pid: child_pid}, state} = Bunch.Access.pop_in(state, [:children, child_name]) state = LinkUtils.unlink_element(child_name, state) - {_result, state} = remove_child_from_crash_group(state, child_pid) + + %{group: group_name} = ChildrenModel.get_child_data!(state, child_name) + + state = + with %{crash_group: %{^group_name => group}} <- state do + CrashGroupUtils.handle_crash_group_member_death(child_name, group, :normal, state) + end + |> ChildrenModel.delete_child(child_name) + {:ok, state} end defp do_handle_child_death(child_name, reason, state) do - %{pid: child_pid} = ChildrenModel.get_child_data!(state, child_name) + %{group: group} = ChildrenModel.get_child_data!(state, child_name) - with {:ok, group} <- CrashGroupUtils.get_group_by_member_pid(child_pid, state) do - state = - group.members - |> Enum.filter(&Map.has_key?(state.children, &1)) - |> remove_children_from_specs(state) + case Map.get(state.crash_groups, group) do + %CrashGroup{} = group -> + state = + CrashGroupUtils.handle_crash_group_member_death(child_name, group, reason, state) + |> ChildrenModel.delete_child(child_name) - state = - crash_all_group_members(group, child_name, state) - |> remove_child_from_crash_group(group, child_pid) - |> case do - {:removed, state} -> - exec_handle_crash_group_down_callback( - group.name, - group.members, - group.crash_initiator || child_name, - state - ) - - {:not_removed, state} -> - state - end - |> Bunch.Access.delete_in([:children, child_name]) + {:ok, state} - {:ok, state} - else - {:error, :not_member} when reason == {:shutdown, :membrane_crash_group_kill} -> + nil when reason == {:shutdown, :membrane_crash_group_kill} -> raise Membrane.PipelineError, "Child #{inspect(child_name)} that was not a member of any crash group killed with :membrane_crash_group_kill." - {:error, :not_member} -> + nil -> Membrane.Logger.debug(""" Child #{inspect(child_name)} crashed but was not a member of any crash group. Terminating. @@ -730,54 +722,4 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = %{state | pending_specs: Map.merge(state.pending_specs, related_specs)} related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) end - - defp exec_handle_crash_group_down_callback( - group_name, - group_members, - crash_initiator, - state - ) do - context_generator = - &Component.context_from_state(&1, - members: group_members, - crash_initiator: crash_initiator - ) - - CallbackHandler.exec_and_handle_callback( - :handle_crash_group_down, - Membrane.Core.Pipeline.ActionHandler, - %{context: context_generator}, - [group_name], - state - ) - end - - # called when process was a member of a crash group - @spec crash_all_group_members(CrashGroup.t(), Child.name(), Parent.state()) :: - Parent.state() - defp crash_all_group_members( - %CrashGroup{triggered?: false} = crash_group, - crash_initiator, - state - ) do - %CrashGroup{alive_members_pids: members_pids} = crash_group - state = LinkUtils.unlink_crash_group(crash_group, state) - Enum.each(members_pids, &Process.exit(&1, {:shutdown, :membrane_crash_group_kill})) - CrashGroupUtils.set_triggered(state, crash_group.name, crash_initiator) - end - - defp crash_all_group_members(_crash_group, _crash_initiator, state), do: state - - defp remove_child_from_crash_group(state, child_pid) do - with {:ok, group} <- CrashGroupUtils.get_group_by_member_pid(child_pid, state) do - remove_child_from_crash_group(state, group, child_pid) - else - {:error, :not_member} -> {:not_removed, state} - end - end - - defp remove_child_from_crash_group(state, group, child_pid) do - CrashGroupUtils.remove_member_of_crash_group(state, group.name, child_pid) - |> CrashGroupUtils.remove_crash_group_if_empty(group.name) - end end diff --git a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex index b7f898fb8..95eb21a67 100644 --- a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex @@ -3,9 +3,9 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do # A module responsible for managing crash groups inside the state of pipeline. alias Membrane.ChildrenSpec - alias Membrane.Core.Parent - alias Membrane.Core.Parent.CrashGroup - alias Membrane.Core.Pipeline + alias Membrane.Core.{CallbackHandler, Component, Parent, Pipeline} + alias Membrane.Core.Parent.{ChildLifeController, ChildrenModel, CrashGroup} + alias Membrane.Core.Parent.ChildLifeController.LinkUtils @spec add_crash_group( {Membrane.Child.group(), ChildrenSpec.crash_group_mode()}, @@ -13,75 +13,106 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do [pid()], Pipeline.State.t() ) :: Pipeline.State.t() - def add_crash_group(group_spec, children_names, children_pids, state) do - {group_name, mode} = group_spec + def add_crash_group(group_name, _mode, children, state) + when is_map_key(state.crash_group, group_name) do + Bunch.Access.update_in(state, [:crash_groups, group_name, :members], &(children ++ &1)) + end - Bunch.Access.update_in(state, [:crash_groups, group_name], fn + def add_crash_group(group_name, mode, children, state) do + Bunch.Access.put_in( + state, + [:crash_groups, group_name], %CrashGroup{ - members: current_children_names, - alive_members_pids: current_alive_members - } = group -> - %CrashGroup{ - group - | members: current_children_names ++ children_names, - alive_members_pids: current_alive_members ++ children_pids - } - - nil -> - %CrashGroup{ - name: group_name, - mode: mode, - members: children_names, - alive_members_pids: children_pids - } - end) + name: group_name, + mode: mode, + members: children + } + ) end - @spec remove_crash_group_if_empty(Pipeline.State.t(), CrashGroup.name()) :: - {:removed | :not_removed, Pipeline.State.t()} - def remove_crash_group_if_empty(state, group_name) do - %CrashGroup{alive_members_pids: alive_members_pids} = state.crash_groups[group_name] + @spec handle_crash_group_member_death(Child.name(), CrashGroup.t(), any(), Parent.state()) :: + Parent.state() + def handle_crash_group_member_death(child_name, crash_group_data, reason, state) - if alive_members_pids == [] do - state = Bunch.Access.delete_in(state, [:crash_groups, group_name]) + def handle_crash_group_member_death( + child_name, + %CrashGroup{triggered?: true} = group, + _reason, + state + ) do + all_members_dead? = + Enum.all?(group.members, fn member -> + member == child_name or not Map.has_key?(state.children, member) + end) - {:removed, state} + if all_members_dead? do + exec_handle_crash_group_down(group, state) else - {:not_removed, state} + state end end - @spec remove_member_of_crash_group(Pipeline.State.t(), CrashGroup.name(), pid()) :: - Pipeline.State.t() - def remove_member_of_crash_group(state, group_name, pid) do + def handle_crash_group_member_death( + child_name, + %CrashGroup{members: [child_name]} = group, + :normal, + state + ) do + Bunch.Access.delete_in(state, [:crash_groups, group.name]) + end + + def handle_crash_group_member_death( + child_name, + %CrashGroup{members: [child_name]} = group, + _reason, + state + ) do + state = ChildLifeController.remove_children_from_specs(group.members, state) + state = LinkUtils.unlink_crash_group(group, state) + exec_handle_crash_group_down(group, state) + end + + def handle_crash_group_member_death(child_name, %CrashGroup{} = group, :normal, state) do Bunch.Access.update_in( state, - [:crash_groups, group_name, :alive_members_pids], - &List.delete(&1, pid) + [:crash_groups, group.name, :members], + &List.delete(&1, child_name) ) end - @spec get_group_by_member_pid(pid(), Parent.state()) :: - {:ok, CrashGroup.t()} | {:error, :not_member} - def get_group_by_member_pid(member_pid, state) do - crash_group = - state.crash_groups - |> Map.values() - |> Enum.find(fn %CrashGroup{alive_members_pids: alive_members_pids} -> - member_pid in alive_members_pids - end) + def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _reason, state) do + state = ChildLifeController.remove_children_from_specs(group.members, state) + state = LinkUtils.unlink_crash_group(group, state) - case crash_group do - %CrashGroup{} -> {:ok, crash_group} - nil -> {:error, :not_member} - end + Enum.each(group.members, fn child -> + ChildrenModel.get_child_data!(state, child) + |> Map.get(:pid) + |> Process.exit({:shutdown, :membrane_crash_group_kill}) + end) + + Bunch.Access.put_in(state, [:crash_groups, group.name], %CrashGroup{ + group + | triggered?: true, + crash_initiator: child_name + }) end - @spec set_triggered(Pipeline.State.t(), CrashGroup.name(), Membrane.Child.name()) :: - Pipeline.State.t() - def set_triggered(state, group_name, crash_initiator) do - Bunch.Access.update_in(state, [:crash_groups, group_name], fn group -> - %{group | triggered?: true, crash_initiator: crash_initiator} - end) + defp exec_handle_crash_group_down( + crash_group, + state + ) do + context_generator = + &Component.context_from_state(&1, + members: crash_group.members, + crash_initiator: crash_group.crash_initiator + ) + + CallbackHandler.exec_and_handle_callback( + :handle_crash_group_down, + Membrane.Core.Pipeline.ActionHandler, + %{context: context_generator}, + [crash_group.name], + state + ) end end diff --git a/lib/membrane/core/parent/children_model.ex b/lib/membrane/core/parent/children_model.ex index 9ce5efcec..96a35c31b 100644 --- a/lib/membrane/core/parent/children_model.ex +++ b/lib/membrane/core/parent/children_model.ex @@ -40,6 +40,11 @@ defmodule Membrane.Core.Parent.ChildrenModel do %{state | children: children} end + @spec delete_child(Parent.state(), Child.name()) :: Parent.state() + def delete_child(%{children: children} = state, child) do + %{state | children: Map.delete(children, child)} + end + @spec all?(Parent.state(), (ChildEntry.t() -> as_boolean(term))) :: boolean() def all?(state, predicate) do state.children