Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify 5s linking timeout constraint #810

Merged
merged 12 commits into from
Jun 3, 2024
8 changes: 6 additions & 2 deletions lib/membrane/bin/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ defmodule Membrane.Bin.PadData do
link_id: private_field,
endpoint: private_field,
linked?: private_field,
response_received?: private_field
response_received?: private_field,
linking_timeout_id: private_field,
linked_in_spec?: private_field
}

@enforce_keys [
Expand All @@ -40,7 +42,9 @@ defmodule Membrane.Bin.PadData do
:endpoint,
:linked?,
:response_received?,
:spec_ref
:spec_ref,
:linking_timeout_id,
:linked_in_spec?
]

defstruct @enforce_keys
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
PadController.handle_linking_timeout(pad_ref, state)
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, linking_timeout_id]), state) do
:ok = PadController.handle_linking_timeout(pad_ref, linking_timeout_id, state)
{:noreply, state}
end

Expand Down
69 changes: 38 additions & 31 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do
alias Membrane.{Core, LinkError, Pad}
alias Membrane.Core.Bin.{ActionHandler, CallbackContext, State}
alias Membrane.Core.{CallbackHandler, Child, Message}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.StreamFormatController
alias Membrane.Core.Parent.{ChildLifeController, Link, SpecificationParser}

require Membrane.Core.Child.PadModel
require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Message
require Membrane.Logger
require Membrane.Pad
Expand Down Expand Up @@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do
state =
case PadModel.get_data(state, pad_ref) do
{:error, :unknown_pad} ->
init_pad_data(pad_ref, pad_info, state)
|> Map.update!(:pad_refs, &[pad_ref | &1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove pad_refs? In case you forgot, it was added by yourself in this PR: #641 :P

Copy link
Member Author

@FelonEkonom FelonEkonom Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I didn't remember it 🙈 There was 1 place, where the pad was removed from pads_data but not from pad_refs, so without this PR value stored under this field may be incorrec. I saw that nothing relies on the value of this field. I guess the reason of this field was to have more readable logs, but has anybody ever looked at it in the logs to debug something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't, but hard to say in general. I have the following idea:

  • call it inspect_pad_refs and assign to it a function that will accept the state and return the pad refs
  • in Membrane.Core.Inspect, if a field starts with inspect_, we'll call the function in it instead of printing it directly

I don't like it much, but should be better than what we have. Otherwise I'd just remove this field, but it should be removed from the element as well

Copy link
Member Author

@FelonEkonom FelonEkonom Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no such field in the element state

init_pad_data(pad_ref, state)

# This case is for pads that were instantiated before the external link request,
# that is in the internal link request (see `handle_internal_link_request/4`).
Expand All @@ -69,17 +67,19 @@ defmodule Membrane.Core.Bin.PadController do
state
end

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
state = maybe_handle_pad_added(pad_ref, state)
linking_timeout_id = make_ref()

unless PadModel.get_data!(state, pad_ref, :endpoint) do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
:ok
end
state =
PadModel.update_data!(
state,
pad_ref,
&%{&1 | link_id: link_id, linking_timeout_id: linking_timeout_id, options: pad_options}
)

state
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_id])
_ref = Process.send_after(self(), message, 5000)

maybe_handle_pad_added(pad_ref, state)
end

@spec remove_pad(Pad.ref(), State.t()) :: State.t()
Expand All @@ -102,16 +102,16 @@ defmodule Membrane.Core.Bin.PadController do
end
end

@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, state) do
case PadModel.get_data(state, pad_ref) do
{:ok, %{endpoint: nil} = pad_data} ->
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"

_other ->
:ok
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, linking_timeout_id, state) do
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
%{linking_timeout_id: ^linking_timeout_id, linked_in_spec?: false} <- pad_data do
raise Membrane.LinkError, """
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
"""
end

:ok
end

@doc """
Expand Down Expand Up @@ -139,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do

# Static pads can be linked internally before the external link request
pad_info.availability == :always ->
init_pad_data(pad_ref, pad_info, state)
init_pad_data(pad_ref, state)

true ->
raise LinkError, "Dynamic pads must be firstly linked externally, then internally"
Expand Down Expand Up @@ -284,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do
with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do
{pad_data, state} =
maybe_handle_pad_removed(pad_ref, state)
|> Map.update!(:pad_refs, &List.delete(&1, pad_ref))
|> PadModel.pop_data!(pad_ref)

if pad_data.endpoint do
Expand Down Expand Up @@ -316,8 +315,8 @@ defmodule Membrane.Core.Bin.PadController do
end

@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_added(ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
defp maybe_handle_pad_added(pad_ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
Expand All @@ -326,7 +325,7 @@ defmodule Membrane.Core.Bin.PadController do
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
[pad_ref],
state
)
else
Expand All @@ -351,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do
end
end

defp init_pad_data(pad_ref, pad_info, state) do
@spec init_pad_data(Pad.ref(), State.t()) :: State.t()
def init_pad_data(pad_ref, state) do
if PadModel.assert_instance(state, pad_ref) == :ok do
raise "Cannot init pad data for pad #{inspect(pad_ref)}, because it already exists"
end

pad_data =
pad_info
state.pads_info
|> Map.get(Pad.name_by_ref(pad_ref))
|> Map.delete(:accepted_formats_str)
|> Map.merge(%{
ref: pad_ref,
Expand All @@ -362,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do
linked?: false,
response_received?: false,
spec_ref: nil,
options: nil
options: nil,
linking_timeout_id: nil,
linked_in_spec?: false
})
|> then(&struct!(Membrane.Bin.PadData, &1))

put_in(state, [:pads_data, pad_ref], pad_data)
put_in(state.pads_data[pad_ref], pad_data)
end
end
4 changes: 1 addition & 3 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Core.Bin.State do
use Bunch
use Bunch.Access

alias Membrane.{Child, Clock, Pad, Sync}
alias Membrane.{Child, Clock, Sync}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Parent.ChildLifeController
alias Membrane.Core.Parent.{ChildrenModel, CrashGroup, Link}
Expand All @@ -20,7 +20,6 @@ defmodule Membrane.Core.Bin.State do
children: ChildrenModel.children(),
subprocess_supervisor: pid(),
name: Membrane.Bin.name() | nil,
pad_refs: [Pad.ref()],
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
Expand Down Expand Up @@ -62,7 +61,6 @@ defmodule Membrane.Core.Bin.State do
parent_pid: nil,
playback: :stopped,
internal_state: nil,
pad_refs: [],
pads_info: nil,
children: %{},
links: %{},
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/child/pad_spec_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ defmodule Membrane.Core.Child.PadSpecHandler do
| pads_info:
get_pads(state)
|> Map.new(),
pads_data: %{},
pad_refs: []
pads_data: %{}
}
end

Expand Down
8 changes: 2 additions & 6 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ defmodule Membrane.Core.Element.PadController do
state = generate_eos_if_needed(pad_ref, state)
state = maybe_handle_pad_removed(pad_ref, state)

{pad_data, state} =
Map.update!(state, :pad_refs, &List.delete(&1, pad_ref))
|> PadModel.pop_data!(pad_ref)
{pad_data, state} = PadModel.pop_data!(state, pad_ref)

with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <-
pad_data do
Expand Down Expand Up @@ -321,9 +319,7 @@ defmodule Membrane.Core.Element.PadController do
|> then(&struct!(Membrane.Element.PadData, &1))

state =
state
|> put_in([:pads_data, endpoint.pad_ref], pad_data)
|> Map.update!(:pad_refs, &[endpoint.pad_ref | &1])
put_in(state, [:pads_data, endpoint.pad_ref], pad_data)

:ok =
AtomicDemand.set_sender_status(
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.State do
type: Element.type(),
name: Element.name(),
internal_state: Element.state() | nil,
pad_refs: [Pad.ref()] | nil,
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
Expand Down Expand Up @@ -65,7 +64,6 @@ defmodule Membrane.Core.Element.State do
playback: :stopped,
type: nil,
internal_state: nil,
pad_refs: [],
pads_info: %{},
synchronization: nil,
delayed_demands: MapSet.new(),
Expand Down
36 changes: 31 additions & 5 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils}
alias Membrane.{Child, ChildrenSpec}
alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline}
alias Membrane.Core.Bin.PadController

alias Membrane.Core.Parent.{
ChildEntryParser,
Expand All @@ -17,6 +18,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias Membrane.Pad
alias Membrane.ParentError

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -154,7 +156,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do

state =
put_in(state, [:pending_specs, spec_ref], %{
status: :initializing,
status: :created,
children_names: MapSet.new(all_children_names),
links_ids: Enum.map(links, & &1.id),
dependent_specs: dependent_specs,
Expand Down Expand Up @@ -309,14 +311,38 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
state =
with %Bin.State{} <- state do
bin_pads_linked_in_spec =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.flat_map(fn
%{child: {Membrane.Bin, :itself}, pad_ref: pad_ref} -> [pad_ref]
_other -> []
end)

bin_pads_linked_in_spec
|> Enum.reduce(state, fn pad_ref, state ->
with {:error, :unknown_pad} <- PadModel.assert_instance(state, pad_ref) do
PadController.init_pad_data(pad_ref, state)
else
:ok -> state
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a case would look better IMO

|> PadModel.set_data!(pad_ref, :linked_in_spec?, true)
end)
end

do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
end

defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
Membrane.Logger.debug(
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
)

%{children: children} = state

if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
Expand Down Expand Up @@ -351,7 +377,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
if Enum.empty?(spec_data.awaiting_responses) do
if MapSet.size(spec_data.awaiting_responses) == 0 do
state =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/core/element/pad_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ defmodule Membrane.Core.Element.PadControllerTest do
state
)

assert Map.drop(new_state, [:pads_data, :pad_refs]) ==
Map.drop(state, [:pads_data, :pad_refs])
assert Map.delete(new_state, :pads_data) ==
Map.delete(state, :pads_data)

assert PadModel.assert_instance(new_state, :input) == :ok
end
Expand Down