diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index ada22f144..bce916fb6 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -66,11 +66,21 @@ defmodule Membrane.Core.Element.BufferController do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) + # we check if pad should be corcked before decrementing :demand field in PadData, + # to avoid situation, when big chunk of data is stored in the queue only because it + # exceeds auto_demand_size sufficiently + hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state) + state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) :atomics.put(stalker_metrics.demand, 1, demand - buf_size) state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) - exec_buffer_callback(pad_ref, buffers, state) + + if hard_corcked? do + AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + else + exec_buffer_callback(pad_ref, buffers, state) + end end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index a76a7441f..8af904e18 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -1,9 +1,16 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @moduledoc false + alias Membrane.Buffer + alias Membrane.Event + alias Membrane.StreamFormat + alias Membrane.Core.Element.{ AtomicDemand, - State + BufferController, + EventController, + State, + StreamFormatController } require Membrane.Core.Child.PadModel, as: PadModel @@ -59,12 +66,45 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do state end + @spec hard_corcked?(Pad.ref(), State.t()) :: boolean() + def hard_corcked?(pad_ref, state) do + pad_data = PadModel.get_data!(state, pad_ref) + + state.effective_flow_control == :pull and pad_data.direction == :input and + pad_data.flow_control == :auto and pad_data.demand < -1 * pad_data.auto_demand_size / 2 + end + + @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() + def store_buffers_in_queue(pad_ref, buffers, state) do + store_in_queue(pad_ref, :buffers, buffers, state) + end + + @spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t() + def store_event_in_queue(pad_ref, event, state) do + store_in_queue(pad_ref, :event, event, state) + end + + @spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t() + def store_stream_format_in_queue(pad_ref, stream_format, state) do + store_in_queue(pad_ref, :stream_format, stream_format, state) + end + + defp store_in_queue(pad_ref, type, item, state) do + PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) + end + @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do - Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2) + state = Enum.reduce(pad_ref_list, state, &do_auto_adjust_atomic_demand/2) + flush_auto_flow_queues(pad_ref_list, state) end def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do + state = do_auto_adjust_atomic_demand(pad_ref, state) + flush_auto_flow_queues([pad_ref], state) + end + + defp do_auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do PadModel.get_data!(state, pad_ref) |> do_auto_adjust_atomic_demand(state) end @@ -107,4 +147,49 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do atomic_demand_value > 0 end + + defp flush_auto_flow_queues(pad_ref_list, state) do + pad_to_queue_map = + pad_ref_list + |> Enum.filter(&hard_corcked?(&1, state)) + |> Map.new(&PadModel.get_data!(state, &1, [:auto_flow_queue])) + + state = handle_queued_items(pad_to_queue_map, state) + + Enum.reduce(pad_ref_list, state, fn pad_ref, state -> + PadModel.set_data!(state, pad_ref, :auto_flow_queue, Qex.new()) + end) + end + + defp handle_queued_items(pad_to_queue_map, state) when pad_to_queue_map == %{}, do: state + + defp handle_queued_items(pad_to_queue_map, state) do + {pad_ref, queue} = Enum.random(pad_to_queue_map) + + case Qex.pop(queue) do + {{:value, queue_item}, popped_queue} -> + state = do_handle_queue_item(pad_ref, queue_item, state) + + pad_to_queue_map + |> Map.put(pad_ref, popped_queue) + |> handle_queued_items(state) + + {:empty, _empty_queue} -> + pad_to_queue_map + |> Map.pop(pad_ref) + |> handle_queued_items(state) + end + end + + defp do_handle_queue_item(pad_ref, {:buffers, buffers}, state) do + BufferController.exec_buffer_callback(pad_ref, buffers, state) + end + + defp do_handle_queue_item(pad_ref, {:event, event}, state) do + EventController.exec_handle_event(pad_ref, event, state) + end + + defp do_handle_queue_item(pad_ref, {:stream_format, stream_format}, state) do + StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) + end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 8db26fb79..5a8729846 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -104,8 +104,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state.pads_data |> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end) - |> Enum.reduce(state, fn - {_ref, %{direction: :output} = pad_data}, state -> + |> Enum.each(fn + {_ref, %{direction: :output} = pad_data} -> :ok = AtomicDemand.set_sender_status( pad_data.atomic_demand, @@ -120,9 +120,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do [pad_data.other_ref, new_effective_flow_control] ) - state - - {pad_ref, %{direction: :input} = pad_data}, state -> + {pad_ref, %{direction: :input} = pad_data} -> if triggering_pad in [pad_ref, nil] or AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do :ok = @@ -131,8 +129,13 @@ defmodule Membrane.Core.Element.EffectiveFlowController do {:resolved, new_effective_flow_control} ) end + end) - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + state.pads_data + |> Enum.filter(fn + {_pad_ref, %{direction: :input, flow_contorl: :auto}} -> true + _other -> false end) + |> AutoFlowUtils.auto_adjust_atomic_demand(state) end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..a5e455483 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -18,6 +18,8 @@ defmodule Membrane.Core.Element.EventController do State } + alias Membrane.Core.Element.DemandController.AutoFlowUtils + require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry @@ -39,15 +41,22 @@ defmodule Membrane.Core.Element.EventController do playback: %State{playback: :playing} <- state do Telemetry.report_metric(:event, 1, inspect(pad_ref)) - if not Event.async?(event) and buffers_before_event_present?(data) do - PadModel.update_data!( - state, - pad_ref, - :input_queue, - &InputQueue.store(&1, :event, event) - ) - else - exec_handle_event(pad_ref, event, state) + cond do + # events goes to the manual flow control input queue + not Event.async?(event) and buffers_before_event_present?(data) -> + PadModel.update_data!( + state, + pad_ref, + :input_queue, + &InputQueue.store(&1, :event, event) + ) + + # event goes to the auto flow control queue + AutoFlowUtils.hard_corcked?(pad_ref, state) -> + AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + + true -> + exec_handle_event(pad_ref, event, state) end else pad: {:error, :unknown_pad} -> diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 1c03bce9e..06751b552 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} + alias Membrane.Core.Element.DemandController.AutoFlowUtils require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -28,15 +29,22 @@ defmodule Membrane.Core.Element.StreamFormatController do queue = data.input_queue - if queue && not InputQueue.empty?(queue) do - PadModel.set_data!( - state, - pad_ref, - :input_queue, - InputQueue.store(queue, :stream_format, stream_format) - ) - else - exec_handle_stream_format(pad_ref, stream_format, state) + cond do + # stream format goes to the manual flow control input queue + queue && not InputQueue.empty?(queue) -> + PadModel.set_data!( + state, + pad_ref, + :input_queue, + InputQueue.store(queue, :stream_format, stream_format) + ) + + # stream format goes to the auto flow control queue + AutoFlowUtils.hard_corcked?(pad_ref, state) -> + AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + + true -> + exec_handle_stream_format(pad_ref, stream_format, state) end else pad: {:error, :unknown_pad} -> diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index f427edfac..ca2ba356f 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -40,6 +40,7 @@ defmodule Membrane.Element.PadData do pid: private_field, other_ref: private_field, input_queue: private_field, + auto_flow_queue: private_field, incoming_demand: integer() | nil, demand_unit: private_field, other_demand_unit: private_field, @@ -80,6 +81,7 @@ defmodule Membrane.Element.PadData do defstruct @enforce_keys ++ [ input_queue: nil, + auto_flow_queue: Qex.new(), demand: 0, incoming_demand: nil, demand_unit: nil,