Skip to content

Commit

Permalink
Implement auto flow queue
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Oct 20, 2023
1 parent 9269612 commit 00794c3
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 27 deletions.
12 changes: 11 additions & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,21 @@ defmodule Membrane.Core.Element.BufferController do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

# we check if pad should be corcked before decrementing :demand field in PadData,
# to avoid situation, when big chunk of data is stored in the queue only because it
# exceeds auto_demand_size sufficiently
hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)

if hard_corcked? do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
else
exec_buffer_callback(pad_ref, buffers, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
Expand Down
89 changes: 87 additions & 2 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@moduledoc false

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.StreamFormat

alias Membrane.Core.Element.{
AtomicDemand,
State
BufferController,
EventController,
State,
StreamFormatController
}

require Membrane.Core.Child.PadModel, as: PadModel
Expand Down Expand Up @@ -59,12 +66,45 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state
end

@spec hard_corcked?(Pad.ref(), State.t()) :: boolean()
def hard_corcked?(pad_ref, state) do
pad_data = PadModel.get_data!(state, pad_ref)

state.effective_flow_control == :pull and pad_data.direction == :input and
pad_data.flow_control == :auto and pad_data.demand < -1 * pad_data.auto_demand_size / 2
end

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
store_in_queue(pad_ref, :buffers, buffers, state)
end

@spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t()
def store_event_in_queue(pad_ref, event, state) do
store_in_queue(pad_ref, :event, event, state)
end

@spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t()
def store_stream_format_in_queue(pad_ref, stream_format, state) do
store_in_queue(pad_ref, :stream_format, stream_format, state)
end

defp store_in_queue(pad_ref, type, item, state) do
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item}))
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2)
state = Enum.reduce(pad_ref_list, state, &do_auto_adjust_atomic_demand/2)
flush_auto_flow_queues(pad_ref_list, state)
end

def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
state = do_auto_adjust_atomic_demand(pad_ref, state)
flush_auto_flow_queues([pad_ref], state)
end

defp do_auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
end
Expand Down Expand Up @@ -107,4 +147,49 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

atomic_demand_value > 0
end

defp flush_auto_flow_queues(pad_ref_list, state) do
pad_to_queue_map =
pad_ref_list
|> Enum.filter(&hard_corcked?(&1, state))
|> Map.new(&PadModel.get_data!(state, &1, [:auto_flow_queue]))

state = handle_queued_items(pad_to_queue_map, state)

Enum.reduce(pad_ref_list, state, fn pad_ref, state ->
PadModel.set_data!(state, pad_ref, :auto_flow_queue, Qex.new())
end)
end

defp handle_queued_items(pad_to_queue_map, state) when pad_to_queue_map == %{}, do: state

defp handle_queued_items(pad_to_queue_map, state) do
{pad_ref, queue} = Enum.random(pad_to_queue_map)

case Qex.pop(queue) do
{{:value, queue_item}, popped_queue} ->
state = do_handle_queue_item(pad_ref, queue_item, state)

pad_to_queue_map
|> Map.put(pad_ref, popped_queue)
|> handle_queued_items(state)

{:empty, _empty_queue} ->
pad_to_queue_map
|> Map.pop(pad_ref)
|> handle_queued_items(state)
end
end

defp do_handle_queue_item(pad_ref, {:buffers, buffers}, state) do
BufferController.exec_buffer_callback(pad_ref, buffers, state)
end

defp do_handle_queue_item(pad_ref, {:event, event}, state) do
EventController.exec_handle_event(pad_ref, event, state)
end

defp do_handle_queue_item(pad_ref, {:stream_format, stream_format}, state) do
StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state)
end
end
15 changes: 9 additions & 6 deletions lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do

state.pads_data
|> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end)
|> Enum.reduce(state, fn
{_ref, %{direction: :output} = pad_data}, state ->
|> Enum.each(fn
{_ref, %{direction: :output} = pad_data} ->
:ok =
AtomicDemand.set_sender_status(
pad_data.atomic_demand,
Expand All @@ -120,9 +120,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
[pad_data.other_ref, new_effective_flow_control]
)

state

{pad_ref, %{direction: :input} = pad_data}, state ->
{pad_ref, %{direction: :input} = pad_data} ->
if triggering_pad in [pad_ref, nil] or
AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do
:ok =
Expand All @@ -131,8 +129,13 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
{:resolved, new_effective_flow_control}
)
end
end)

AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
state.pads_data
|> Enum.filter(fn
{_pad_ref, %{direction: :input, flow_contorl: :auto}} -> true
_other -> false
end)
|> AutoFlowUtils.auto_adjust_atomic_demand(state)
end
end
27 changes: 18 additions & 9 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Membrane.Core.Element.EventController do
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils

require Membrane.Core.Child.PadModel
require Membrane.Core.Message
require Membrane.Core.Telemetry
Expand All @@ -39,15 +41,22 @@ defmodule Membrane.Core.Element.EventController do
playback: %State{playback: :playing} <- state do
Telemetry.report_metric(:event, 1, inspect(pad_ref))

if not Event.async?(event) and buffers_before_event_present?(data) do
PadModel.update_data!(
state,
pad_ref,
:input_queue,
&InputQueue.store(&1, :event, event)
)
else
exec_handle_event(pad_ref, event, state)
cond do
# events goes to the manual flow control input queue
not Event.async?(event) and buffers_before_event_present?(data) ->
PadModel.update_data!(
state,
pad_ref,
:input_queue,
&InputQueue.store(&1, :event, event)
)

# event goes to the auto flow control queue
AutoFlowUtils.hard_corcked?(pad_ref, state) ->
AutoFlowUtils.store_event_in_queue(pad_ref, event, state)

true ->
exec_handle_event(pad_ref, event, state)
end
else
pad: {:error, :unknown_pad} ->
Expand Down
26 changes: 17 additions & 9 deletions lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.StreamFormatController do
alias Membrane.Core.{CallbackHandler, Telemetry}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State}
alias Membrane.Core.Element.DemandController.AutoFlowUtils

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry
Expand All @@ -28,15 +29,22 @@ defmodule Membrane.Core.Element.StreamFormatController do

queue = data.input_queue

if queue && not InputQueue.empty?(queue) do
PadModel.set_data!(
state,
pad_ref,
:input_queue,
InputQueue.store(queue, :stream_format, stream_format)
)
else
exec_handle_stream_format(pad_ref, stream_format, state)
cond do
# stream format goes to the manual flow control input queue
queue && not InputQueue.empty?(queue) ->
PadModel.set_data!(
state,
pad_ref,
:input_queue,
InputQueue.store(queue, :stream_format, stream_format)
)

# stream format goes to the auto flow control queue
AutoFlowUtils.hard_corcked?(pad_ref, state) ->
AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state)

true ->
exec_handle_stream_format(pad_ref, stream_format, state)
end
else
pad: {:error, :unknown_pad} ->
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/element/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule Membrane.Element.PadData do
pid: private_field,
other_ref: private_field,
input_queue: private_field,
auto_flow_queue: private_field,
incoming_demand: integer() | nil,
demand_unit: private_field,
other_demand_unit: private_field,
Expand Down Expand Up @@ -80,6 +81,7 @@ defmodule Membrane.Element.PadData do
defstruct @enforce_keys ++
[
input_queue: nil,
auto_flow_queue: Qex.new(),
demand: 0,
incoming_demand: nil,
demand_unit: nil,
Expand Down

0 comments on commit 00794c3

Please sign in to comment.