Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky tests on Elixir 1.15 #595

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)
* Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594)
* Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
Terminating.
""")

{:error, {:membrane_child_crash, child_name}}
{:error, {:membrane_child_crash, child_name, reason}}
end
end

Expand Down
34 changes: 34 additions & 0 deletions lib/membrane/testing/assertions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,40 @@ defmodule Membrane.Testing.Assertions do
end
end

@doc """
Asserts that `Membrane.Testing.Sink` with name `sink_name` entered the playing
playback.
"""
defmacro assert_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do
do_sink_playing(&assert_receive_from_pipeline/3, pipeline, sink_name, timeout)
end

@doc """
Asserts that `Membrane.Testing.Sink` with name `sink_name` didn't enter the playing
playback.
"""
defmacro refute_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do
do_sink_playing(&refute_receive_from_pipeline/3, pipeline, sink_name, timeout)
end

defp do_sink_playing(assertion, pipeline, sink_name, timeout) do
quote do
element_name_value = unquote(sink_name)

unquote(
assertion.(
pipeline,
{:handle_child_notification,
{:playing,
quote do
^element_name_value
end}},
timeout
)
)
end
end

@doc """
Asserts that `Membrane.Testing.Pipeline` received or is going to receive start_of_stream
notification from the element with a name `element_name` within the `timeout` period
Expand Down
11 changes: 7 additions & 4 deletions lib/membrane/testing/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ defmodule Membrane.Testing.Sink do
end

@impl true
def handle_playing(_context, %{autodemand: true} = state),
do: {[demand: :input], state}
def handle_playing(_ctx, state) do
actions =
notify(:playing) ++
if(state.autodemand, do: [demand: :input], else: [])

def handle_playing(_context, state), do: {[], state}
{actions, state}
end

@impl true
def handle_event(:input, event, _context, state) do
def handle_event(:input, event, _ctx, state) do
{notify({:event, event}), state}
end

Expand Down
13 changes: 11 additions & 2 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
]
)

assert_sink_playing(pipeline, :right_sink)

Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000})

Enum.each(1..1000, fn payload ->
Expand Down Expand Up @@ -191,7 +193,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %StreamFormat{}}], state}
{[stream_format: {:output, %StreamFormat{}}, notify_parent: :playing], state}
end
end

Expand All @@ -204,6 +206,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
|> child(:sink, Sink)
)

assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})

Expand Down Expand Up @@ -232,9 +236,14 @@ defmodule Membrane.Integration.AutoDemandsTest do

Process.monitor(pipeline)

assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
assert_receive({:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink}})

assert_receive(
{:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}}
)
end

defp reduce_link(link, enum, fun) do
Expand Down
13 changes: 8 additions & 5 deletions test/membrane/integration/child_pad_removed_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
def_options test_process: [spec: pid()]

@impl true
def handle_init(ctx, opts) do
send(opts.test_process, {:init, ctx.name})
{[], Map.from_struct(opts)}
def handle_init(_ctx, opts), do: {[], Map.from_struct(opts)}

@impl true
def handle_playing(ctx, state) do
send(state.test_process, {:playing, ctx.name})
{[], state}
end
end

Expand Down Expand Up @@ -161,7 +164,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
] do
pipeline = start_pipeline!(DynamicBin, StaticSink)

assert_receive {:init, :sink}
assert_receive {:playing, :sink}
sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink)
monitor_ref = Process.monitor(sink_pid)

Expand All @@ -183,7 +186,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
] do
pipeline = start_link_pipeline!(DynamicBin, StaticSink, remove_children: :sink)

assert_receive {:init, :sink}
assert_receive {:playing, :sink}
sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink)
monitor_ref = Process.monitor(sink_pid)

Expand Down
4 changes: 3 additions & 1 deletion test/membrane/integration/defer_setup_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ defmodule Membrane.Integration.DeferSetupTest do

monitor_ref = Process.monitor(pipeline)
complete_child_setup(pipeline, :bin_2)
assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline, {:membrane_child_crash, :bin_2}}

assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline,
{:membrane_child_crash, :bin_2, _bin_exit_reason}}
end

defp complete_child_setup(pipeline, child) do
Expand Down
92 changes: 54 additions & 38 deletions test/membrane/integration/demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule Membrane.Integration.DemandsTest do
end

defp test_pipeline(pid) do
assert_sink_playing(pid, :sink)

demand = 500
Pipeline.message_child(pid, :sink, {:make_demand, demand})

Expand Down Expand Up @@ -172,6 +174,11 @@ defmodule Membrane.Integration.DemandsTest do
@impl true
def handle_init(_ctx, _opts), do: {[], %{counter: 0}}

@impl true
def handle_playing(_ctx, state) do
{[notify_parent: :playing], state}
end

@impl true
def handle_buffer(:input, _buffer, _ctx, state) do
{[], Map.update!(state, :counter, &(&1 + 1))}
Expand All @@ -189,44 +196,6 @@ defmodule Membrane.Integration.DemandsTest do
end
end

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

# time for pipeline to start playing
Process.sleep(500)

for _i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg.
# slowdown of the source when running all tests in the project asynchronously
assert buff_no > 70

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end

defmodule Funnel do
use Membrane.Filter

Expand Down Expand Up @@ -293,4 +262,51 @@ defmodule Membrane.Integration.DemandsTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule Sync do
use Bunch
use ExUnit.Case, async: false

alias Membrane.Integration.DemandsTest.{PausingSink, RedemandingSource}

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

assert_sink_playing(pipeline, :sink)

# time for pipeline to start playing
Process.sleep(1000)

for i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive around 100 buffers, but the boundary is set to 65, in case of eg.
# slowdown of the source when running all tests in the project asynchronously
if i != 1, do: assert(buff_no > 65)

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end
end
end
3 changes: 2 additions & 1 deletion test/membrane/integration/links_validation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ defmodule Membrane.LinksValidationTest do

Pipeline.execute_actions(pipeline, spec: spec)

assert_receive({:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}})
assert_receive {:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}},
1000
end
end
end
2 changes: 2 additions & 0 deletions test/membrane/integration/toilet_forwarding_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_playing(pipeline, :sink)

buffers =
1..4000
|> Enum.map(fn i -> %Membrane.Buffer{payload: <<i::64>>} end)
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/pipeline_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Membrane.PipelineSupervisorTest do
assert_receive {:DOWN, ^supervisor_monitor_ref, _process, _pid, ^exit_reason}
end

test "Pipeline supervisor exits with {:membrane_child_crash, child_name} when pipeline's child crashes" do
test "Pipeline supervisor exits with {:membrane_child_crash, child_name, child_exit_reason} when pipeline's child crashes" do
defmodule MyElement do
use Membrane.Endpoint

Expand All @@ -44,7 +44,7 @@ defmodule Membrane.PipelineSupervisorTest do
element_exit_reason = :custom_exit_reason
Process.exit(element, element_exit_reason)

pipeline_exit_reason = {:membrane_child_crash, :element}
pipeline_exit_reason = {:membrane_child_crash, :element, element_exit_reason}

assert_receive {:DOWN, ^element_monitor_ref, _process, _pid, ^element_exit_reason}
assert_receive {:DOWN, ^pipeline_monitor_ref, _process, _pid, ^pipeline_exit_reason}
Expand Down
13 changes: 8 additions & 5 deletions test/membrane/remote_controlled/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defmodule Membrane.RCPipelineTest do
use Membrane.Filter
alias Membrane.Buffer

require Membrane.Logger

def_output_pad :output, flow_control: :manual, accepted_format: _any, availability: :always

def_input_pad :input,
Expand Down Expand Up @@ -73,10 +75,11 @@ defmodule Membrane.RCPipelineTest do

# TEST
assert_receive %RCMessage.Notification{
from: ^pipeline,
element: :b,
data: %Membrane.Buffer{payload: "test"}
}
from: ^pipeline,
element: :b,
data: %Membrane.Buffer{payload: "test"}
},
2000

assert_receive %RCMessage.StartOfStream{from: ^pipeline, element: :b, pad: :input}

Expand All @@ -91,7 +94,7 @@ defmodule Membrane.RCPipelineTest do
RCPipeline.exec_actions(pipeline, spec: @pipeline_spec)

# TEST
assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input}
assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input}, 2000

assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :c, pad: :input}

Expand Down
2 changes: 1 addition & 1 deletion test/membrane/sync_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Membrane.SyncTest do
use Bunch
use ExUnit.Case, async: true
use ExUnit.Case, async: false

@module Membrane.Sync

Expand Down
11 changes: 8 additions & 3 deletions test/membrane/utility_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Membrane.UtilitySupervisorTest do
alias Membrane.Testing

test "Utility supervisor terminates utility when element exits" do
Process.register(self(), :utility_supervisor_test_process)

defmodule TestFilter do
use Membrane.Filter

Expand All @@ -16,7 +18,7 @@ defmodule Membrane.UtilitySupervisorTest do
ctx.utility_supervisor,
{Task,
fn ->
Process.register(self(), :utility_supervisor_test_task)
send(:utility_supervisor_test_process, {:task_pid, self()})
Process.sleep(:infinity)
end}
)
Expand All @@ -28,8 +30,11 @@ defmodule Membrane.UtilitySupervisorTest do
pipeline = Testing.Pipeline.start_supervised!(spec: [child(:filter, TestFilter)])

assert_pipeline_notified(pipeline, :filter, :setup)
monitor = Process.monitor(:utility_supervisor_test_task)
assert_receive {:task_pid, task_pid}

monitor_ref = Process.monitor(task_pid)

Testing.Pipeline.terminate(pipeline)
assert_receive {:DOWN, ^monitor, :process, _pid, :shutdown}
assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown}
end
end
Loading