From cfa54e8c40888c2346f498b09be981c26c8291d0 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 19 Jul 2023 16:45:07 +0200 Subject: [PATCH] Implement suggestions from CR wip --- lib/membrane/core/element/pad_controller.ex | 103 +++++++++--------- .../core/parent/child_life_controller.ex | 20 ++-- 2 files changed, 56 insertions(+), 67 deletions(-) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 20333833a..21271c11e 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -300,25 +300,23 @@ defmodule Membrane.Core.Element.PadController do pad_data = pad_info |> Map.delete(:accepted_formats_str) - |> merge_pad_data( - &%{ - pid: other_endpoint.pid, - other_ref: other_endpoint.pad_ref, - options: - Child.PadController.parse_pad_options!(&1.name, endpoint.pad_props.options, state), - ref: endpoint.pad_ref, - stream_format_validation_params: stream_format_validation_params, - other_effective_flow_control: other_effective_flow_control, - stream_format: nil, - start_of_stream?: false, - end_of_stream?: false, - associated_pads: [], - atomic_demand: metadata.atomic_demand, - stalker_metrics: %{ - total_buffers: total_buffers_metric - } + |> Map.merge(%{ + pid: other_endpoint.pid, + other_ref: other_endpoint.pad_ref, + options: + Child.PadController.parse_pad_options!(pad_info.name, endpoint.pad_props.options, state), + ref: endpoint.pad_ref, + stream_format_validation_params: stream_format_validation_params, + other_effective_flow_control: other_effective_flow_control, + stream_format: nil, + start_of_stream?: false, + end_of_stream?: false, + associated_pads: [], + atomic_demand: metadata.atomic_demand, + stalker_metrics: %{ + total_buffers: total_buffers_metric } - ) + }) |> merge_pad_direction_data(metadata, state) |> merge_pad_mode_data(endpoint.pad_props, other_info, state) |> then(&struct!(Membrane.Element.PadData, &1)) @@ -353,33 +351,25 @@ defmodule Membrane.Core.Element.PadController do end end - defp merge_pad_data(pad_data, fun) do - Map.merge(pad_data, fun.(pad_data), fn - :stalker_metrics, m1, m2 -> Map.merge(m1, m2) - _key, _v1, v2 -> v2 - end) - end - - defp merge_pad_direction_data(pad_data, metadata, state) do - merge_pad_data(pad_data, &init_pad_direction_data(&1, metadata, state)) - end - - defp merge_pad_mode_data(pad_data, props, other_info, state) do - merge_pad_data(pad_data, &init_pad_mode_data(&1, props, other_info, state)) - end - - defp init_pad_direction_data(%{direction: :input}, metadata, _state), - do: %{ + defp merge_pad_direction_data(%{direction: :input} = pad_data, metadata, _state) do + pad_data + |> Map.merge(%{ sticky_messages: [], demand_unit: metadata.input_demand_unit, other_demand_unit: metadata.output_demand_unit - } + }) + end - defp init_pad_direction_data(%{direction: :output}, metadata, _state), - do: %{demand_unit: metadata.output_demand_unit, other_demand_unit: metadata.input_demand_unit} + defp merge_pad_direction_data(%{direction: :output} = pad_data, metadata, _state) do + pad_data + |> Map.merge(%{ + demand_unit: metadata.output_demand_unit, + other_demand_unit: metadata.input_demand_unit + }) + end - defp init_pad_mode_data( - %{direction: :input, flow_control: :manual} = data, + defp merge_pad_mode_data( + %{direction: :input, flow_control: :manual} = pad_data, props, other_info, %State{} @@ -388,7 +378,7 @@ defmodule Membrane.Core.Element.PadController do ref: ref, demand_unit: this_demand_unit, atomic_demand: atomic_demand - } = data + } = pad_data input_queue = InputQueue.init(%{ @@ -400,20 +390,24 @@ defmodule Membrane.Core.Element.PadController do target_size: props.target_queue_size }) - %{input_queue: input_queue, demand: 0} + pad_data + |> Map.merge(%{ + input_queue: input_queue, + demand: 0 + }) end - defp init_pad_mode_data( - %{direction: :output, flow_control: :manual}, + defp merge_pad_mode_data( + %{direction: :output, flow_control: :manual} = pad_data, _props, _other_info, _state ) do - %{demand: 0} + Map.put(pad_data, :demand, 0) end - defp init_pad_mode_data( - %{flow_control: :auto, direction: direction} = data, + defp merge_pad_mode_data( + %{flow_control: :auto, direction: direction} = pad_data, props, _other_info, %State{} = state @@ -433,7 +427,7 @@ defmodule Membrane.Core.Element.PadController do props.auto_demand_size true -> - demand_unit = data.other_demand_unit || data.demand_unit || :buffers + demand_unit = pad_data.other_demand_unit || pad_data.demand_unit || :buffers metric = Membrane.Buffer.Metric.from_unit(demand_unit) metric.buffer_size_approximation() * @default_auto_demand_size_factor end @@ -443,20 +437,21 @@ defmodule Membrane.Core.Element.PadController do :atomics.new(1, []) |> tap( &Stalker.register_metric_function(:auto_demand_size, fn -> :atomics.get(&1, 1) end, - pad: data.ref + pad: pad_data.ref ) ) end - %{ + pad_data + |> Map.merge(%{ demand: 0, associated_pads: associated_pads, - auto_demand_size: auto_demand_size, - stalker_metrics: %{demand: demand_metric} - } + auto_demand_size: auto_demand_size + }) + |> put_in([:stalker_metrics, :demand], demand_metric) end - defp init_pad_mode_data(_data, _props, _other_info, _state), do: %{} + defp merge_pad_mode_data(pad_data, _props, _other_info, _state), do: pad_data @doc """ Generates end of stream on the given input pad if it hasn't been generated yet diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 263a6a882..9214a2dd9 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -652,21 +652,12 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end - defp do_handle_child_death(child_name, :normal, state) do - state = LinkUtils.unlink_element(child_name, state) - + defp do_handle_child_death(child_name, reason, state) do state = - with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do - CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, :normal, state) - else - :error -> state - end - |> ChildrenModel.delete_child(child_name) + if reason == :normal, + do: LinkUtils.unlink_element(child_name, state), + else: state - {:ok, state} - end - - defp do_handle_child_death(child_name, reason, state) do with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do state = CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state) @@ -674,6 +665,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do {:ok, state} else + :error when reason == :normal -> + {:ok, ChildrenModel.delete_child(state, child_name)} + :error when reason == {:shutdown, :membrane_crash_group_kill} -> raise Membrane.PipelineError, "Child #{inspect(child_name)} that was not a member of any crash group killed with :membrane_crash_group_kill."