diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 4db74e343..6b6c3dd2a 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,7 +27,9 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field + response_received?: private_field, + linking_timeout_id: private_field, + linked_in_spec?: private_field } @enforce_keys [ @@ -40,7 +42,9 @@ defmodule Membrane.Bin.PadData do :endpoint, :linked?, :response_received?, - :spec_ref + :spec_ref, + :linking_timeout_id, + :linked_in_spec? ] defstruct @enforce_keys diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 37b09b22a..157cf4cf2 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do - PadController.handle_linking_timeout(pad_ref, state) + defp do_handle_info(Message.new(:linking_timeout, [pad_ref, linking_timeout_id]), state) do + :ok = PadController.handle_linking_timeout(pad_ref, linking_timeout_id, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index af6c448d7..2ca31bb96 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do alias Membrane.{Core, LinkError, Pad} alias Membrane.Core.Bin.{ActionHandler, CallbackContext, State} alias Membrane.Core.{CallbackHandler, Child, Message} - alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.StreamFormatController alias Membrane.Core.Parent.{ChildLifeController, Link, SpecificationParser} - require Membrane.Core.Child.PadModel + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message require Membrane.Logger require Membrane.Pad @@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do state = case PadModel.get_data(state, pad_ref) do {:error, :unknown_pad} -> - init_pad_data(pad_ref, pad_info, state) - |> Map.update!(:pad_refs, &[pad_ref | &1]) + init_pad_data(pad_ref, state) # This case is for pads that were instantiated before the external link request, # that is in the internal link request (see `handle_internal_link_request/4`). @@ -69,17 +67,19 @@ defmodule Membrane.Core.Bin.PadController do state end - state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) - state = maybe_handle_pad_added(pad_ref, state) + linking_timeout_id = make_ref() - unless PadModel.get_data!(state, pad_ref, :endpoint) do - # If there's no endpoint associated to the pad, no internal link to the pad - # has been requested in the bin yet - _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) - :ok - end + state = + PadModel.update_data!( + state, + pad_ref, + &%{&1 | link_id: link_id, linking_timeout_id: linking_timeout_id, options: pad_options} + ) - state + message = Message.new(:linking_timeout, [pad_ref, linking_timeout_id]) + _ref = Process.send_after(self(), message, 5000) + + maybe_handle_pad_added(pad_ref, state) end @spec remove_pad(Pad.ref(), State.t()) :: State.t() @@ -102,16 +102,16 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return() - def handle_linking_timeout(pad_ref, state) do - case PadModel.get_data(state, pad_ref) do - {:ok, %{endpoint: nil} = pad_data} -> - raise Membrane.LinkError, - "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}" - - _other -> - :ok + @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return() + def handle_linking_timeout(pad_ref, linking_timeout_id, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + %{linking_timeout_id: ^linking_timeout_id, linked_in_spec?: false} <- pad_data do + raise Membrane.LinkError, """ + Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)} + """ end + + :ok end @doc """ @@ -139,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do # Static pads can be linked internally before the external link request pad_info.availability == :always -> - init_pad_data(pad_ref, pad_info, state) + init_pad_data(pad_ref, state) true -> raise LinkError, "Dynamic pads must be firstly linked externally, then internally" @@ -284,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do {pad_data, state} = maybe_handle_pad_removed(pad_ref, state) - |> Map.update!(:pad_refs, &List.delete(&1, pad_ref)) |> PadModel.pop_data!(pad_ref) if pad_data.endpoint do @@ -316,8 +315,8 @@ defmodule Membrane.Core.Bin.PadController do end @spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t() - defp maybe_handle_pad_added(ref, state) do - %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) + defp maybe_handle_pad_added(pad_ref, state) do + %{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref) if Pad.availability_mode(availability) == :dynamic do context = &CallbackContext.from_state(&1, pad_options: pad_opts) @@ -326,7 +325,7 @@ defmodule Membrane.Core.Bin.PadController do :handle_pad_added, ActionHandler, %{context: context}, - [ref], + [pad_ref], state ) else @@ -351,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do end end - defp init_pad_data(pad_ref, pad_info, state) do + @spec init_pad_data(Pad.ref(), State.t()) :: State.t() + def init_pad_data(pad_ref, state) do + if PadModel.assert_instance(state, pad_ref) == :ok do + raise "Cannot init pad data for pad #{inspect(pad_ref)}, because it already exists" + end + pad_data = - pad_info + state.pads_info + |> Map.get(Pad.name_by_ref(pad_ref)) |> Map.delete(:accepted_formats_str) |> Map.merge(%{ ref: pad_ref, @@ -362,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do linked?: false, response_received?: false, spec_ref: nil, - options: nil + options: nil, + linking_timeout_id: nil, + linked_in_spec?: false }) |> then(&struct!(Membrane.Bin.PadData, &1)) - put_in(state, [:pads_data, pad_ref], pad_data) + put_in(state.pads_data[pad_ref], pad_data) end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index e9605a1cf..9355018a8 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -8,7 +8,7 @@ defmodule Membrane.Core.Bin.State do use Bunch use Bunch.Access - alias Membrane.{Child, Clock, Pad, Sync} + alias Membrane.{Child, Clock, Sync} alias Membrane.Core.Child.PadModel alias Membrane.Core.Parent.ChildLifeController alias Membrane.Core.Parent.{ChildrenModel, CrashGroup, Link} @@ -20,7 +20,6 @@ defmodule Membrane.Core.Bin.State do children: ChildrenModel.children(), subprocess_supervisor: pid(), name: Membrane.Bin.name() | nil, - pad_refs: [Pad.ref()], pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, @@ -62,7 +61,6 @@ defmodule Membrane.Core.Bin.State do parent_pid: nil, playback: :stopped, internal_state: nil, - pad_refs: [], pads_info: nil, children: %{}, links: %{}, diff --git a/lib/membrane/core/child/pad_spec_handler.ex b/lib/membrane/core/child/pad_spec_handler.ex index 70e53d075..e4921055e 100644 --- a/lib/membrane/core/child/pad_spec_handler.ex +++ b/lib/membrane/core/child/pad_spec_handler.ex @@ -21,8 +21,7 @@ defmodule Membrane.Core.Child.PadSpecHandler do | pads_info: get_pads(state) |> Map.new(), - pads_data: %{}, - pad_refs: [] + pads_data: %{} } end diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 633d26d9c..6aa3b55fe 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -229,9 +229,7 @@ defmodule Membrane.Core.Element.PadController do state = generate_eos_if_needed(pad_ref, state) state = maybe_handle_pad_removed(pad_ref, state) - {pad_data, state} = - Map.update!(state, :pad_refs, &List.delete(&1, pad_ref)) - |> PadModel.pop_data!(pad_ref) + {pad_data, state} = PadModel.pop_data!(state, pad_ref) with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <- pad_data do @@ -321,9 +319,7 @@ defmodule Membrane.Core.Element.PadController do |> then(&struct!(Membrane.Element.PadData, &1)) state = - state - |> put_in([:pads_data, endpoint.pad_ref], pad_data) - |> Map.update!(:pad_refs, &[endpoint.pad_ref | &1]) + put_in(state, [:pads_data, endpoint.pad_ref], pad_data) :ok = AtomicDemand.set_sender_status( diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index d73c29335..e7e6392f4 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.State do type: Element.type(), name: Element.name(), internal_state: Element.state() | nil, - pad_refs: [Pad.ref()] | nil, pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, @@ -65,7 +64,6 @@ defmodule Membrane.Core.Element.State do playback: :stopped, type: nil, internal_state: nil, - pad_refs: [], pads_info: %{}, synchronization: nil, delayed_demands: MapSet.new(), diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index ba63112c1..dd585bf97 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils} alias Membrane.{Child, ChildrenSpec} alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline} + alias Membrane.Core.Bin.PadController alias Membrane.Core.Parent.{ ChildEntryParser, @@ -17,6 +18,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -154,7 +156,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = put_in(state, [:pending_specs, spec_ref], %{ - status: :initializing, + status: :created, children_names: MapSet.new(all_children_names), links_ids: Enum.map(links, & &1.id), dependent_specs: dependent_specs, @@ -309,14 +311,37 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end + defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do + state = + with %Bin.State{} <- state do + bin_pads_linked_in_spec = + spec_data.links_ids + |> Enum.map(&Map.fetch!(state.links, &1)) + |> Enum.flat_map(&[&1.from, &1.to]) + |> Enum.flat_map(fn + %{child: {Membrane.Bin, :itself}, pad_ref: pad_ref} -> [pad_ref] + _other -> [] + end) + + bin_pads_linked_in_spec + |> Enum.reduce(state, fn pad_ref, state -> + case PadModel.assert_instance(state, pad_ref) do + :ok -> state + {:error, :unknown_pad} -> PadController.init_pad_data(pad_ref, state) + end + |> PadModel.set_data!(pad_ref, :linked_in_spec?, true) + end) + end + + do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) + end + defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do Membrane.Logger.debug( "Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}" ) - %{children: children} = state - - if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and + if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and Enum.empty?(spec_data.dependent_specs) do Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized") @@ -353,7 +378,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do - if Enum.empty?(spec_data.awaiting_responses) do + if MapSet.size(spec_data.awaiting_responses) == 0 do state = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1)) diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 4537a21bf..e8c607b19 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -65,8 +65,8 @@ defmodule Membrane.Core.Element.PadControllerTest do state ) - assert Map.drop(new_state, [:pads_data, :pad_refs]) == - Map.drop(state, [:pads_data, :pad_refs]) + assert Map.delete(new_state, :pads_data) == + Map.delete(state, :pads_data) assert PadModel.assert_instance(new_state, :input) == :ok end