From c3bb2fb0e3ad2100aeac732b1903d397965d4c23 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 23 May 2024 16:16:47 +0200 Subject: [PATCH 1/9] Modify 5s linking timeout constraint --- lib/membrane/bin/pad_data.ex | 5 ++- lib/membrane/core/bin.ex | 4 +- lib/membrane/core/bin/pad_controller.ex | 44 +++++++++++-------- lib/membrane/core/bin/state.ex | 2 + .../core/parent/child_life_controller.ex | 33 +++++++++++--- 5 files changed, 60 insertions(+), 28 deletions(-) diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 4db74e343..8fce6efdf 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,7 +27,8 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field + response_received?: private_field, + linking_timeout_ref: private_field } @enforce_keys [ @@ -43,5 +44,5 @@ defmodule Membrane.Bin.PadData do :spec_ref ] - defstruct @enforce_keys + defstruct @enforce_keys ++ [:linking_timeout_ref] end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 37b09b22a..42e3c2e63 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do - PadController.handle_linking_timeout(pad_ref, state) + defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do + PadController.handle_linking_timeout(pad_ref, timeout_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index af6c448d7..dc1010d94 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -70,16 +70,22 @@ defmodule Membrane.Core.Bin.PadController do end state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) - state = maybe_handle_pad_added(pad_ref, state) - unless PadModel.get_data!(state, pad_ref, :endpoint) do - # If there's no endpoint associated to the pad, no internal link to the pad - # has been requested in the bin yet - _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) - :ok - end + state = + if PadModel.get_data!(state, pad_ref, :endpoint) == nil do + # If there's no endpoint associated to the pad, no internal link to the pad + # has been requested in the bin yet - state + linking_timeout_ref = make_ref() + message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref]) + Process.send_after(self(), message, 5000) + + PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref) + else + state + end + + maybe_handle_pad_added(pad_ref, state) end @spec remove_pad(Pad.ref(), State.t()) :: State.t() @@ -102,15 +108,15 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return() - def handle_linking_timeout(pad_ref, state) do - case PadModel.get_data(state, pad_ref) do - {:ok, %{endpoint: nil} = pad_data} -> - raise Membrane.LinkError, - "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}" + @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return() + def handle_linking_timeout(pad_ref, timeout_ref, state) do + map_set_item = {pad_ref, timeout_ref} - _other -> - :ok + if MapSet.member?(state.initialized_internal_pads, map_set_item) do + Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item)) + else + raise Membrane.LinkError, + "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}" end end @@ -316,8 +322,8 @@ defmodule Membrane.Core.Bin.PadController do end @spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t() - defp maybe_handle_pad_added(ref, state) do - %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) + defp maybe_handle_pad_added(pad_ref, state) do + %{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref) if Pad.availability_mode(availability) == :dynamic do context = &CallbackContext.from_state(&1, pad_options: pad_opts) @@ -326,7 +332,7 @@ defmodule Membrane.Core.Bin.PadController do :handle_pad_added, ActionHandler, %{context: context}, - [ref], + [pad_ref], state ) else diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index e9605a1cf..662538149 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,6 +45,7 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), + initialized_internal_pads: MapSet.t(Pad.ref()), stalker: Membrane.Core.Stalker.t() } @@ -76,5 +77,6 @@ defmodule Membrane.Core.Bin.State do resource_guard: nil, subprocess_supervisor: nil, children_log_metadata: [], + initialized_internal_pads: MapSet.new(), pads_data: nil end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 794c5ea64..04dc9d173 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -17,6 +17,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -154,7 +155,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = put_in(state, [:pending_specs, spec_ref], %{ - status: :initializing, + status: :created, children_names: MapSet.new(all_children_names), links_ids: Enum.map(links, & &1.id), dependent_specs: dependent_specs, @@ -309,14 +310,36 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end + defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do + state = + with %Bin.State{} <- state do + initialized_internal_pads = + spec_data.links_ids + |> Enum.map(&Map.fetch!(state.links, &1)) + |> Enum.flat_map(&[&1.from, &1.to]) + |> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} -> + with {Membrane.Bin, :itself} <- child, + {:ok, timeout_ref} when timeout_ref != nil <- + PadModel.get_data(state, pad_ref, :linking_timeout_ref) do + [{pad_ref, timeout_ref}] + else + _other -> [] + end + end) + |> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1)) + + %{state | initialized_internal_pads: initialized_internal_pads} + end + + do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) + end + defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do Membrane.Logger.debug( "Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}" ) - %{children: children} = state - - if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and + if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and Enum.empty?(spec_data.dependent_specs) do Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized") do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state) @@ -351,7 +374,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do - if Enum.empty?(spec_data.awaiting_responses) do + if MapSet.size(spec_data.awaiting_responses) == 0 do state = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1)) From b200a4ccb4b603eb046a48d7c8d7e990f53448fb Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 23 May 2024 16:27:26 +0200 Subject: [PATCH 2/9] Make dialyzer happy --- lib/membrane/core/bin.ex | 2 +- lib/membrane/core/bin/pad_controller.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 42e3c2e63..e16acb297 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -210,7 +210,7 @@ defmodule Membrane.Core.Bin do end defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do - PadController.handle_linking_timeout(pad_ref, timeout_ref, state) + state = PadController.handle_linking_timeout(pad_ref, timeout_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index dc1010d94..8152f7a3f 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -108,7 +108,7 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return() + @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: State.t() def handle_linking_timeout(pad_ref, timeout_ref, state) do map_set_item = {pad_ref, timeout_ref} From e4cfff849e87a2a9fc74b8d93a47c95dbd75783e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 23 May 2024 17:17:19 +0200 Subject: [PATCH 3/9] Maybe fix bug occuring in FishJam --- .../core/parent/child_life_controller/crash_group_utils.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex index 8537f7d5c..5cf3b2ebd 100644 --- a/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/crash_group_utils.ex @@ -44,7 +44,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do # and we will not want to have it in :crash_group_members in the callback context in handle_crash_group_down/3, # so this child is removed from :members in crash group struct members = List.delete(group.members, child_name) - state = put_in(state, [:crash_groups, group.name, :members], members) + state = put_in(state.crash_groups[group.name].members, members) if group.detonating? and Enum.all?(members, &(not Map.has_key?(state.children, &1))) do cleanup_crash_group(group.name, state) From 814f10b9806983a6d539a4362f986649e31135f1 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 24 May 2024 11:42:14 +0200 Subject: [PATCH 4/9] Fix linking timeout mechanism --- lib/membrane/bin/pad_data.ex | 5 ++- lib/membrane/core/bin.ex | 4 +-- lib/membrane/core/bin/pad_controller.ex | 34 +++++++------------ lib/membrane/core/bin/state.ex | 4 +-- .../core/parent/child_life_controller.ex | 22 +++++------- test/membrane/integration/linking_test.exs | 1 + 6 files changed, 28 insertions(+), 42 deletions(-) diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 8fce6efdf..4db74e343 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,8 +27,7 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field, - linking_timeout_ref: private_field + response_received?: private_field } @enforce_keys [ @@ -44,5 +43,5 @@ defmodule Membrane.Bin.PadData do :spec_ref ] - defstruct @enforce_keys ++ [:linking_timeout_ref] + defstruct @enforce_keys end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index e16acb297..2d324c99a 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do - state = PadController.handle_linking_timeout(pad_ref, timeout_ref, state) + defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do + state = PadController.handle_linking_timeout(pad_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 8152f7a3f..453e92463 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -71,19 +71,7 @@ defmodule Membrane.Core.Bin.PadController do state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) - state = - if PadModel.get_data!(state, pad_ref, :endpoint) == nil do - # If there's no endpoint associated to the pad, no internal link to the pad - # has been requested in the bin yet - - linking_timeout_ref = make_ref() - message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref]) - Process.send_after(self(), message, 5000) - - PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref) - else - state - end + _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) maybe_handle_pad_added(pad_ref, state) end @@ -108,15 +96,19 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: State.t() - def handle_linking_timeout(pad_ref, timeout_ref, state) do - map_set_item = {pad_ref, timeout_ref} + @spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return() + def handle_linking_timeout(pad_ref, state) do + case Map.fetch(state.linking_timeout_counters, pad_ref) do + {:ok, 1} -> + Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref)) - if MapSet.member?(state.initialized_internal_pads, map_set_item) do - Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item)) - else - raise Membrane.LinkError, - "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}" + {:ok, counter} when counter > 1 -> + put_in(state.linking_timeout_counters[pad_ref], counter - 1) + + _else -> + raise Membrane.LinkError, """ + Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)} + """ end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 662538149..68421d016 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - initialized_internal_pads: MapSet.t(Pad.ref()), + linking_timeout_counters: %{optional(Pad.ref()) => integer()}, stalker: Membrane.Core.Stalker.t() } @@ -77,6 +77,6 @@ defmodule Membrane.Core.Bin.State do resource_guard: nil, subprocess_supervisor: nil, children_log_metadata: [], - initialized_internal_pads: MapSet.new(), + linking_timeout_counters: %{}, pads_data: nil end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 04dc9d173..e972b44b5 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -17,7 +17,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError - require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -313,22 +312,17 @@ defmodule Membrane.Core.Parent.ChildLifeController do defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do state = with %Bin.State{} <- state do - initialized_internal_pads = + linking_timeout_counters = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1)) |> Enum.flat_map(&[&1.from, &1.to]) - |> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} -> - with {Membrane.Bin, :itself} <- child, - {:ok, timeout_ref} when timeout_ref != nil <- - PadModel.get_data(state, pad_ref, :linking_timeout_ref) do - [{pad_ref, timeout_ref}] - else - _other -> [] - end - end) - |> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1)) - - %{state | initialized_internal_pads: initialized_internal_pads} + |> Enum.filter(&(&1.child == {Membrane.Bin, :itself})) + |> Enum.reduce( + state.linking_timeout_counters, + &Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end) + ) + + %{state | linking_timeout_counters: linking_timeout_counters} end do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index a2a9e5081..ec732a40a 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -353,6 +353,7 @@ defmodule Membrane.Integration.LinkingTest do Membrane.Pipeline.terminate(pipeline) end + @tag :xd test "Bin should crash if it doesn't link internally within timeout" do defmodule NoInternalLinkBin do use Membrane.Bin From 7e06e7056cd8fc5a92358c65cd1ad1aaf0ba1664 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 28 May 2024 10:35:27 +0200 Subject: [PATCH 5/9] Remove leftover --- test/membrane/integration/linking_test.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index ec732a40a..a2a9e5081 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -353,7 +353,6 @@ defmodule Membrane.Integration.LinkingTest do Membrane.Pipeline.terminate(pipeline) end - @tag :xd test "Bin should crash if it doesn't link internally within timeout" do defmodule NoInternalLinkBin do use Membrane.Bin From 67ba8d2451d738bfea9d94d23900acb09c74a92f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 31 May 2024 14:37:51 +0200 Subject: [PATCH 6/9] Implement suggestions from CR & delete unnecessary state field --- lib/membrane/bin/pad_data.ex | 8 ++- lib/membrane/core/bin.ex | 4 +- lib/membrane/core/bin/pad_controller.ex | 59 +++++++++++-------- lib/membrane/core/bin/state.ex | 4 -- lib/membrane/core/child/pad_spec_handler.ex | 3 +- lib/membrane/core/element/pad_controller.ex | 8 +-- lib/membrane/core/element/state.ex | 2 - .../core/parent/child_life_controller.ex | 26 +++++--- .../core/element/pad_controller_test.exs | 4 +- 9 files changed, 65 insertions(+), 53 deletions(-) diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 4db74e343..6b6c3dd2a 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,7 +27,9 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field + response_received?: private_field, + linking_timeout_id: private_field, + linked_in_spec?: private_field } @enforce_keys [ @@ -40,7 +42,9 @@ defmodule Membrane.Bin.PadData do :endpoint, :linked?, :response_received?, - :spec_ref + :spec_ref, + :linking_timeout_id, + :linked_in_spec? ] defstruct @enforce_keys diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 2d324c99a..157cf4cf2 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do - state = PadController.handle_linking_timeout(pad_ref, state) + defp do_handle_info(Message.new(:linking_timeout, [pad_ref, linking_timeout_id]), state) do + :ok = PadController.handle_linking_timeout(pad_ref, linking_timeout_id, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 453e92463..2ca31bb96 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do alias Membrane.{Core, LinkError, Pad} alias Membrane.Core.Bin.{ActionHandler, CallbackContext, State} alias Membrane.Core.{CallbackHandler, Child, Message} - alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.StreamFormatController alias Membrane.Core.Parent.{ChildLifeController, Link, SpecificationParser} - require Membrane.Core.Child.PadModel + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message require Membrane.Logger require Membrane.Pad @@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do state = case PadModel.get_data(state, pad_ref) do {:error, :unknown_pad} -> - init_pad_data(pad_ref, pad_info, state) - |> Map.update!(:pad_refs, &[pad_ref | &1]) + init_pad_data(pad_ref, state) # This case is for pads that were instantiated before the external link request, # that is in the internal link request (see `handle_internal_link_request/4`). @@ -69,9 +67,17 @@ defmodule Membrane.Core.Bin.PadController do state end - state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) + linking_timeout_id = make_ref() - _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) + state = + PadModel.update_data!( + state, + pad_ref, + &%{&1 | link_id: link_id, linking_timeout_id: linking_timeout_id, options: pad_options} + ) + + message = Message.new(:linking_timeout, [pad_ref, linking_timeout_id]) + _ref = Process.send_after(self(), message, 5000) maybe_handle_pad_added(pad_ref, state) end @@ -96,20 +102,16 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return() - def handle_linking_timeout(pad_ref, state) do - case Map.fetch(state.linking_timeout_counters, pad_ref) do - {:ok, 1} -> - Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref)) - - {:ok, counter} when counter > 1 -> - put_in(state.linking_timeout_counters[pad_ref], counter - 1) - - _else -> - raise Membrane.LinkError, """ - Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)} - """ + @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return() + def handle_linking_timeout(pad_ref, linking_timeout_id, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + %{linking_timeout_id: ^linking_timeout_id, linked_in_spec?: false} <- pad_data do + raise Membrane.LinkError, """ + Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)} + """ end + + :ok end @doc """ @@ -137,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do # Static pads can be linked internally before the external link request pad_info.availability == :always -> - init_pad_data(pad_ref, pad_info, state) + init_pad_data(pad_ref, state) true -> raise LinkError, "Dynamic pads must be firstly linked externally, then internally" @@ -282,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do {pad_data, state} = maybe_handle_pad_removed(pad_ref, state) - |> Map.update!(:pad_refs, &List.delete(&1, pad_ref)) |> PadModel.pop_data!(pad_ref) if pad_data.endpoint do @@ -349,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do end end - defp init_pad_data(pad_ref, pad_info, state) do + @spec init_pad_data(Pad.ref(), State.t()) :: State.t() + def init_pad_data(pad_ref, state) do + if PadModel.assert_instance(state, pad_ref) == :ok do + raise "Cannot init pad data for pad #{inspect(pad_ref)}, because it already exists" + end + pad_data = - pad_info + state.pads_info + |> Map.get(Pad.name_by_ref(pad_ref)) |> Map.delete(:accepted_formats_str) |> Map.merge(%{ ref: pad_ref, @@ -360,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do linked?: false, response_received?: false, spec_ref: nil, - options: nil + options: nil, + linking_timeout_id: nil, + linked_in_spec?: false }) |> then(&struct!(Membrane.Bin.PadData, &1)) - put_in(state, [:pads_data, pad_ref], pad_data) + put_in(state.pads_data[pad_ref], pad_data) end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 68421d016..e058e8a03 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -20,7 +20,6 @@ defmodule Membrane.Core.Bin.State do children: ChildrenModel.children(), subprocess_supervisor: pid(), name: Membrane.Bin.name() | nil, - pad_refs: [Pad.ref()], pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, @@ -45,7 +44,6 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - linking_timeout_counters: %{optional(Pad.ref()) => integer()}, stalker: Membrane.Core.Stalker.t() } @@ -63,7 +61,6 @@ defmodule Membrane.Core.Bin.State do parent_pid: nil, playback: :stopped, internal_state: nil, - pad_refs: [], pads_info: nil, children: %{}, links: %{}, @@ -77,6 +74,5 @@ defmodule Membrane.Core.Bin.State do resource_guard: nil, subprocess_supervisor: nil, children_log_metadata: [], - linking_timeout_counters: %{}, pads_data: nil end diff --git a/lib/membrane/core/child/pad_spec_handler.ex b/lib/membrane/core/child/pad_spec_handler.ex index 70e53d075..e4921055e 100644 --- a/lib/membrane/core/child/pad_spec_handler.ex +++ b/lib/membrane/core/child/pad_spec_handler.ex @@ -21,8 +21,7 @@ defmodule Membrane.Core.Child.PadSpecHandler do | pads_info: get_pads(state) |> Map.new(), - pads_data: %{}, - pad_refs: [] + pads_data: %{} } end diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 633d26d9c..6aa3b55fe 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -229,9 +229,7 @@ defmodule Membrane.Core.Element.PadController do state = generate_eos_if_needed(pad_ref, state) state = maybe_handle_pad_removed(pad_ref, state) - {pad_data, state} = - Map.update!(state, :pad_refs, &List.delete(&1, pad_ref)) - |> PadModel.pop_data!(pad_ref) + {pad_data, state} = PadModel.pop_data!(state, pad_ref) with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <- pad_data do @@ -321,9 +319,7 @@ defmodule Membrane.Core.Element.PadController do |> then(&struct!(Membrane.Element.PadData, &1)) state = - state - |> put_in([:pads_data, endpoint.pad_ref], pad_data) - |> Map.update!(:pad_refs, &[endpoint.pad_ref | &1]) + put_in(state, [:pads_data, endpoint.pad_ref], pad_data) :ok = AtomicDemand.set_sender_status( diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index d73c29335..e7e6392f4 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.State do type: Element.type(), name: Element.name(), internal_state: Element.state() | nil, - pad_refs: [Pad.ref()] | nil, pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, @@ -65,7 +64,6 @@ defmodule Membrane.Core.Element.State do playback: :stopped, type: nil, internal_state: nil, - pad_refs: [], pads_info: %{}, synchronization: nil, delayed_demands: MapSet.new(), diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index e972b44b5..8b12ba54d 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils} alias Membrane.{Child, ChildrenSpec} alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline} + alias Membrane.Core.Bin.PadController alias Membrane.Core.Parent.{ ChildEntryParser, @@ -17,6 +18,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -312,17 +314,25 @@ defmodule Membrane.Core.Parent.ChildLifeController do defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do state = with %Bin.State{} <- state do - linking_timeout_counters = + bin_pads_linked_in_spec = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1)) |> Enum.flat_map(&[&1.from, &1.to]) - |> Enum.filter(&(&1.child == {Membrane.Bin, :itself})) - |> Enum.reduce( - state.linking_timeout_counters, - &Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end) - ) - - %{state | linking_timeout_counters: linking_timeout_counters} + |> Enum.flat_map(fn + %{child: {Membrane.Bin, :itself}, pad_ref: pad_ref} -> [pad_ref] + _other -> [] + end) + + bin_pads_linked_in_spec + |> Enum.reduce(state, fn pad_ref, state -> + with {:error, :unknown_pad} <- PadModel.assert_instance(state, pad_ref) do + PadController.init_pad_data(pad_ref, state) + # |> Map.update!(:pad_refs, &[pad_ref | &1]) + else + :ok -> state + end + |> PadModel.set_data!(pad_ref, :linked_in_spec?, true) + end) end do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 4537a21bf..e8c607b19 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -65,8 +65,8 @@ defmodule Membrane.Core.Element.PadControllerTest do state ) - assert Map.drop(new_state, [:pads_data, :pad_refs]) == - Map.drop(state, [:pads_data, :pad_refs]) + assert Map.delete(new_state, :pads_data) == + Map.delete(state, :pads_data) assert PadModel.assert_instance(new_state, :input) == :ok end From d9ba12826bde94f927d2dc36b3929681ca0d481e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 31 May 2024 14:53:22 +0200 Subject: [PATCH 7/9] Remove leftover --- lib/membrane/core/parent/child_life_controller.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 8b12ba54d..a67f38eef 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -327,7 +327,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do |> Enum.reduce(state, fn pad_ref, state -> with {:error, :unknown_pad} <- PadModel.assert_instance(state, pad_ref) do PadController.init_pad_data(pad_ref, state) - # |> Map.update!(:pad_refs, &[pad_ref | &1]) else :ok -> state end From 2ca56a7b612593d5510a5612d380b76f349a553c Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 31 May 2024 14:59:37 +0200 Subject: [PATCH 8/9] Satisfy lint --- lib/membrane/core/bin/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index e058e8a03..9355018a8 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -8,7 +8,7 @@ defmodule Membrane.Core.Bin.State do use Bunch use Bunch.Access - alias Membrane.{Child, Clock, Pad, Sync} + alias Membrane.{Child, Clock, Sync} alias Membrane.Core.Child.PadModel alias Membrane.Core.Parent.ChildLifeController alias Membrane.Core.Parent.{ChildrenModel, CrashGroup, Link} From 8dac29a2f95f67bf8285b566b1de34384a5630e8 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 3 Jun 2024 14:30:51 +0200 Subject: [PATCH 9/9] with -> case --- lib/membrane/core/parent/child_life_controller.ex | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index a67f38eef..16de0df04 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -325,10 +325,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do bin_pads_linked_in_spec |> Enum.reduce(state, fn pad_ref, state -> - with {:error, :unknown_pad} <- PadModel.assert_instance(state, pad_ref) do - PadController.init_pad_data(pad_ref, state) - else + case PadModel.assert_instance(state, pad_ref) do :ok -> state + {:error, :unknown_pad} -> PadController.init_pad_data(pad_ref, state) end |> PadModel.set_data!(pad_ref, :linked_in_spec?, true) end)