Skip to content

Commit

Permalink
ref: Get rid of concurrent async
Browse files Browse the repository at this point in the history
We can easily workaround async with queue and a thread and btw simplify the subscriber. Swallowing errors is not good, closes #396
  • Loading branch information
route committed Jan 4, 2024
1 parent 36babe6 commit 0d02fcb
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 72 deletions.
9 changes: 0 additions & 9 deletions .github/gemfiles/websocket-driver-0.6.x.gemfile

This file was deleted.

9 changes: 0 additions & 9 deletions .github/gemfiles/websocket-driver-0.7.x.gemfile

This file was deleted.

2 changes: 0 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ jobs:
strategy:
fail-fast: false
matrix:
gemfile: [websocket-driver-0.6.x, websocket-driver-0.7.x]
ruby: [2.7, "3.0", 3.1, 3.2, 3.3]
runs-on: ubuntu-latest
env:
FERRUM_PROCESS_TIMEOUT: 25
FERRUM_DEFAULT_TIMEOUT: 15
BUNDLE_GEMFILE: .github/gemfiles/${{ matrix.gemfile }}.gemfile
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion ferrum.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "addressable", "~> 2.5"
s.add_runtime_dependency "concurrent-ruby", "~> 1.1"
s.add_runtime_dependency "webrick", "~> 1.7"
s.add_runtime_dependency "websocket-driver", ">= 0.6", "< 0.8"
s.add_runtime_dependency "websocket-driver", "~> 0.7"
end
1 change: 1 addition & 0 deletions lib/ferrum.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "ferrum/utils/thread"
require "ferrum/utils/platform"
require "ferrum/utils/elapsed_time"
require "ferrum/utils/attempt"
Expand Down
47 changes: 20 additions & 27 deletions lib/ferrum/browser/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
module Ferrum
class Browser
class Client
INTERRUPTIONS = %w[Fetch.requestPaused Fetch.authRequired].freeze

extend Forwardable
delegate %i[timeout timeout=] => :options

Expand All @@ -19,25 +17,9 @@ def initialize(ws_url, options)
@options = options
@pendings = Concurrent::Hash.new
@ws = WebSocket.new(ws_url, options.ws_max_receive_size, options.logger)
@subscriber, @interrupter = Subscriber.build(2)

@thread = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.report_on_exception = true if Thread.current.respond_to?(:report_on_exception=)

loop do
message = @ws.messages.pop
break unless message
@subscriber = Subscriber.new

if INTERRUPTIONS.include?(message["method"])
@interrupter.async.call(message)
elsif message.key?("method")
@subscriber.async.call(message)
else
@pendings[message["id"]]&.set(message)
end
end
end
start
end

def command(method, params = {})
Expand All @@ -57,23 +39,19 @@ def command(method, params = {})
end

def on(event, &block)
case event
when *INTERRUPTIONS
@interrupter.on(event, &block)
else
@subscriber.on(event, &block)
end
@subscriber.on(event, &block)
end

def subscribed?(event)
[@interrupter, @subscriber].any? { |s| s.subscribed?(event) }
@subscriber.subscribed?(event)
end

def close
@ws.close
# Give a thread some time to handle a tail of messages
@pendings.clear
@thread.kill unless @thread.join(1)
@subscriber.close
end

def inspect
Expand All @@ -85,6 +63,21 @@ def inspect

private

def start
@thread = Utils::Thread.spawn do
loop do
message = @ws.messages.pop
break unless message

if message.key?("method")
@subscriber << message
else
@pendings[message["id"]]&.set(message)
end
end
end
end

def build_message(method, params)
{ method: method, params: params }.merge(id: next_command_id)
end
Expand Down
46 changes: 40 additions & 6 deletions lib/ferrum/browser/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@
module Ferrum
class Browser
class Subscriber
include Concurrent::Async

def self.build(size)
(0..size).map { new }
end
INTERRUPTIONS = %w[Fetch.requestPaused Fetch.authRequired].freeze

def initialize
super
@regular = Queue.new
@priority = Queue.new
@on = Concurrent::Hash.new { |h, k| h[k] = Concurrent::Array.new }

start
end

def <<(message)
if INTERRUPTIONS.include?(message["method"])
@priority.push(message)
else
@regular.push(message)
end
end

def on(event, &block)
Expand All @@ -23,6 +30,33 @@ def subscribed?(event)
@on.key?(event)
end

def close
@regular_thread&.kill
@priority_thread&.kill
end

private

def start
@regular_thread = Utils::Thread.spawn do
loop do
message = @regular.pop
break unless message

call(message)
end
end

@priority_thread = Utils::Thread.spawn do
loop do
message = @priority.pop
break unless message

call(message)
end
end
end

def call(message)
method, params = message.values_at("method", "params")
total = @on[method].size
Expand Down
31 changes: 16 additions & 15 deletions lib/ferrum/browser/web_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,7 @@ def initialize(url, max_receive_size, logger)
@driver.on(:message, &method(:on_message))
@driver.on(:close, &method(:on_close))

@thread = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.report_on_exception = true if Thread.current.respond_to?(:report_on_exception=)

begin
loop do
data = @sock.readpartial(512)
break unless data

@driver.parse(data)
end
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
@messages.close
end
end
start

@driver.start
end
Expand Down Expand Up @@ -86,6 +72,21 @@ def write(data)
def close
@driver.close
end

private

def start
@thread = Utils::Thread.spawn do
loop do
data = @sock.readpartial(512)
break unless data

@driver.parse(data)
end
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
@messages.close
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/ferrum/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def add_target(params)
end

def update_target(target_id, params)
@targets[target_id].update(params)
@targets[target_id]&.update(params)
end

def delete_target(target_id)
Expand Down
2 changes: 1 addition & 1 deletion lib/ferrum/page/frames.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def subscribe_frame_stopped_loading
end

frame = @frames[params["frameId"]]
frame.state = :stopped_loading
frame&.state = :stopped_loading

@event.set if idling?
end
Expand Down
18 changes: 18 additions & 0 deletions lib/ferrum/utils/thread.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Ferrum
module Utils
module Thread
module_function

def spawn
::Thread.new do
::Thread.current.abort_on_exception = true
::Thread.current.report_on_exception = true if ::Thread.current.respond_to?(:report_on_exception=)

yield
end
end
end
end
end
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
end

config.after(:all) do
@browser.quit
@browser&.quit
end

config.before(:each) do
Expand Down

0 comments on commit 0d02fcb

Please sign in to comment.