Skip to content

Commit

Permalink
Implement suggestions from CR wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jul 19, 2023
1 parent 809f17f commit cfa54e8
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 67 deletions.
103 changes: 49 additions & 54 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,25 +300,23 @@ defmodule Membrane.Core.Element.PadController do
pad_data =
pad_info
|> Map.delete(:accepted_formats_str)
|> merge_pad_data(
&%{
pid: other_endpoint.pid,
other_ref: other_endpoint.pad_ref,
options:
Child.PadController.parse_pad_options!(&1.name, endpoint.pad_props.options, state),
ref: endpoint.pad_ref,
stream_format_validation_params: stream_format_validation_params,
other_effective_flow_control: other_effective_flow_control,
stream_format: nil,
start_of_stream?: false,
end_of_stream?: false,
associated_pads: [],
atomic_demand: metadata.atomic_demand,
stalker_metrics: %{
total_buffers: total_buffers_metric
}
|> Map.merge(%{
pid: other_endpoint.pid,
other_ref: other_endpoint.pad_ref,
options:
Child.PadController.parse_pad_options!(pad_info.name, endpoint.pad_props.options, state),
ref: endpoint.pad_ref,
stream_format_validation_params: stream_format_validation_params,
other_effective_flow_control: other_effective_flow_control,
stream_format: nil,
start_of_stream?: false,
end_of_stream?: false,
associated_pads: [],
atomic_demand: metadata.atomic_demand,
stalker_metrics: %{
total_buffers: total_buffers_metric
}
)
})
|> merge_pad_direction_data(metadata, state)
|> merge_pad_mode_data(endpoint.pad_props, other_info, state)
|> then(&struct!(Membrane.Element.PadData, &1))
Expand Down Expand Up @@ -353,33 +351,25 @@ defmodule Membrane.Core.Element.PadController do
end
end

defp merge_pad_data(pad_data, fun) do
Map.merge(pad_data, fun.(pad_data), fn
:stalker_metrics, m1, m2 -> Map.merge(m1, m2)
_key, _v1, v2 -> v2
end)
end

defp merge_pad_direction_data(pad_data, metadata, state) do
merge_pad_data(pad_data, &init_pad_direction_data(&1, metadata, state))
end

defp merge_pad_mode_data(pad_data, props, other_info, state) do
merge_pad_data(pad_data, &init_pad_mode_data(&1, props, other_info, state))
end

defp init_pad_direction_data(%{direction: :input}, metadata, _state),
do: %{
defp merge_pad_direction_data(%{direction: :input} = pad_data, metadata, _state) do
pad_data
|> Map.merge(%{
sticky_messages: [],
demand_unit: metadata.input_demand_unit,
other_demand_unit: metadata.output_demand_unit
}
})
end

defp init_pad_direction_data(%{direction: :output}, metadata, _state),
do: %{demand_unit: metadata.output_demand_unit, other_demand_unit: metadata.input_demand_unit}
defp merge_pad_direction_data(%{direction: :output} = pad_data, metadata, _state) do
pad_data
|> Map.merge(%{
demand_unit: metadata.output_demand_unit,
other_demand_unit: metadata.input_demand_unit
})
end

defp init_pad_mode_data(
%{direction: :input, flow_control: :manual} = data,
defp merge_pad_mode_data(
%{direction: :input, flow_control: :manual} = pad_data,
props,
other_info,
%State{}
Expand All @@ -388,7 +378,7 @@ defmodule Membrane.Core.Element.PadController do
ref: ref,
demand_unit: this_demand_unit,
atomic_demand: atomic_demand
} = data
} = pad_data

input_queue =
InputQueue.init(%{
Expand All @@ -400,20 +390,24 @@ defmodule Membrane.Core.Element.PadController do
target_size: props.target_queue_size
})

%{input_queue: input_queue, demand: 0}
pad_data
|> Map.merge(%{
input_queue: input_queue,
demand: 0
})
end

defp init_pad_mode_data(
%{direction: :output, flow_control: :manual},
defp merge_pad_mode_data(
%{direction: :output, flow_control: :manual} = pad_data,
_props,
_other_info,
_state
) do
%{demand: 0}
Map.put(pad_data, :demand, 0)
end

defp init_pad_mode_data(
%{flow_control: :auto, direction: direction} = data,
defp merge_pad_mode_data(
%{flow_control: :auto, direction: direction} = pad_data,
props,
_other_info,
%State{} = state
Expand All @@ -433,7 +427,7 @@ defmodule Membrane.Core.Element.PadController do
props.auto_demand_size

true ->
demand_unit = data.other_demand_unit || data.demand_unit || :buffers
demand_unit = pad_data.other_demand_unit || pad_data.demand_unit || :buffers
metric = Membrane.Buffer.Metric.from_unit(demand_unit)
metric.buffer_size_approximation() * @default_auto_demand_size_factor
end
Expand All @@ -443,20 +437,21 @@ defmodule Membrane.Core.Element.PadController do
:atomics.new(1, [])
|> tap(
&Stalker.register_metric_function(:auto_demand_size, fn -> :atomics.get(&1, 1) end,
pad: data.ref
pad: pad_data.ref
)
)
end

%{
pad_data
|> Map.merge(%{
demand: 0,
associated_pads: associated_pads,
auto_demand_size: auto_demand_size,
stalker_metrics: %{demand: demand_metric}
}
auto_demand_size: auto_demand_size
})
|> put_in([:stalker_metrics, :demand], demand_metric)
end

defp init_pad_mode_data(_data, _props, _other_info, _state), do: %{}
defp merge_pad_mode_data(pad_data, _props, _other_info, _state), do: pad_data

@doc """
Generates end of stream on the given input pad if it hasn't been generated yet
Expand Down
20 changes: 7 additions & 13 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -652,28 +652,22 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_handle_child_death(child_name, :normal, state) do
state = LinkUtils.unlink_element(child_name, state)

defp do_handle_child_death(child_name, reason, state) do
state =
with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, :normal, state)
else
:error -> state
end
|> ChildrenModel.delete_child(child_name)
if reason == :normal,
do: LinkUtils.unlink_element(child_name, state),
else: state

{:ok, state}
end

defp do_handle_child_death(child_name, reason, state) do
with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do
state =
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)
|> ChildrenModel.delete_child(child_name)

{:ok, state}
else
:error when reason == :normal ->
{:ok, ChildrenModel.delete_child(state, child_name)}

:error when reason == {:shutdown, :membrane_crash_group_kill} ->
raise Membrane.PipelineError,
"Child #{inspect(child_name)} that was not a member of any crash group killed with :membrane_crash_group_kill."
Expand Down

0 comments on commit cfa54e8

Please sign in to comment.