Skip to content

Commit

Permalink
[iron] Bugfix for writer not being able to open again after closing (…
Browse files Browse the repository at this point in the history
…backport #1599) (#1635)

* re-applies fixes from #1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too.

Signed-off-by: Yannick Schulz <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer

Signed-off-by: Yannick Schulz <[email protected]>

* Remove unrelated delta

- message_definitions_ was intentionally allocated on the stack and
should persist between writer close() and open() because it represents
cache for message definitions which is not changes.

Signed-off-by: Michael Orlov <[email protected]>

* Don't call virtual methods from destructors

Signed-off-by: Michael Orlov <[email protected]>

* Cleanup 'rosbag2_directory_next' after the test run

Signed-off-by: Michael Orlov <[email protected]>

* Protect Writer::open(..) and Writer::close() with mutex on upper level

- Rationale: To avoid race conditions if open(..) and close() could be
ever be called from different threads.

Signed-off-by: Michael Orlov <[email protected]>

* Bugfix for WRITE_SPLIT callback not called for the last compressed file

Signed-off-by: Michael Orlov <[email protected]>

* Bugfix for lost messages from cache when closing compression writer

Signed-off-by: Michael Orlov <[email protected]>

* Address build failure by using rcpputils::fs instead of std::filesystem

- Note: On Iron we haven't migrated to the std::filesystem and using
rcpputils::fs

Signed-off-by: Michael Orlov <[email protected]>

* Adopt failing 'open_succeeds_twice' test for Iron

Signed-off-by: Michael Orlov <[email protected]>

* Return from writer's open() immediately if storage already open

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Yannick Schulz <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Yannick Schulz <[email protected]>
  • Loading branch information
MichaelOrlov and yschulz authored May 11, 2024
1 parent 4914ab3 commit a360d9b
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ SequentialCompressionWriter::SequentialCompressionWriter(

SequentialCompressionWriter::~SequentialCompressionWriter()
{
close();
if (storage_) {
SequentialCompressionWriter::close();
}
}

void SequentialCompressionWriter::compression_thread_fn()
Expand Down Expand Up @@ -177,6 +179,10 @@ void SequentialCompressionWriter::open(
const rosbag2_cpp::ConverterOptions & converter_options)
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
// Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (storage_) {
return; // The writer already opened.
}
SequentialWriter::open(storage_options, converter_options);
setup_compression();
}
Expand All @@ -192,7 +198,18 @@ void SequentialCompressionWriter::close()
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);
try {
storage_.reset(); // Storage must be closed before it can be compressed.
// Storage must be closed before file can be compressed.
if (use_cache_) {
// destructor will flush message cache
cache_consumer_.reset();
message_cache_.reset();
}
finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
storage_.reset(); // Storage will be closed in storage_ destructor
}

if (!metadata_.relative_file_paths.empty()) {
std::string file = metadata_.relative_file_paths.back();
compressor_file_queue_.push(file);
Expand All @@ -202,22 +219,10 @@ void SequentialCompressionWriter::close()
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what());
}
}

stop_compressor_threads();

finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
}
metadata_io_->write_metadata(base_folder_, metadata_);
}

if (use_cache_) {
cache_consumer_.reset();
message_cache_.reset();
}
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with
// compressed filename when compressor threads will finish.
SequentialWriter::close();
}

void SequentialCompressionWriter::create_topic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,30 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, open_succeeds_twice)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
auto tmp_dir_next = rcpputils::fs::temp_directory_path() / "path_not_empty_next";

auto storage_options = rosbag2_storage::StorageOptions();
auto storage_options_next = rosbag2_storage::StorageOptions();

storage_options.uri = tmp_dir.string();
storage_options_next.uri = tmp_dir_next.string();

EXPECT_NO_THROW(
writer_->open(storage_options, {serialization_format_, serialization_format_}));

writer_->close();
EXPECT_NO_THROW(
writer_->open(storage_options_next, {serialization_format_, serialization_format_}));
}

TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

// Used to track topic -> message count. If cache is present, it is updated by CacheConsumer
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;
// Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly
// adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from
// CacheConsumer callback i.e., write_messages(..)
std::mutex topics_info_mutex_;

LocalMessageDefinitionSource message_definitions_;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ void Writer::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->open(storage_options, converter_options);
}

void Writer::close()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->close();
}

Expand Down
29 changes: 22 additions & 7 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ SequentialWriter::SequentialWriter(
metadata_io_(std::move(metadata_io)),
converter_(nullptr),
topics_names_to_info_(),
message_definitions_(),
topic_names_to_message_definitions_(),
metadata_()
{}

Expand All @@ -65,7 +65,9 @@ SequentialWriter::~SequentialWriter()
// Callbacks likely was created after SequentialWriter object and may point to the already
// destructed objects.
callback_manager_.delete_all_callbacks();
close();
if (storage_) {
SequentialWriter::close();
}
}

void SequentialWriter::init_metadata()
Expand All @@ -88,8 +90,13 @@ void SequentialWriter::open(
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
// Note. Close and open methods protected with mutex on upper rosbag2_cpp::writer level.
if (storage_) {
return; // The writer already opened.
}
base_folder_ = storage_options.uri;
storage_options_ = storage_options;

if (storage_options_.storage_id.empty()) {
storage_options_.storage_id = rosbag2_storage::get_default_storage_id();
}
Expand Down Expand Up @@ -171,13 +178,22 @@ void SequentialWriter::close()
}

if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
// bag file was closed before callback call.
}
if (!metadata_.relative_file_paths.empty()) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
// Take the latest file name from metadata in case if it was updated after compression in
// derived class
info->closed_file =
(rcpputils::fs::path(base_folder_) / metadata_.relative_file_paths.back()).string();
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}
storage_factory_.reset();

topics_names_to_info_.clear();
topic_names_to_message_definitions_.clear();

converter_.reset();
}

void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
Expand Down Expand Up @@ -290,14 +306,13 @@ void SequentialWriter::switch_to_next_storage()
base_folder_,
metadata_.relative_file_paths.size());
storage_ = storage_factory_->open_read_write(storage_options_);
storage_->update_metadata(metadata_);

if (!storage_) {
std::stringstream errmsg;
errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!";

throw std::runtime_error(errmsg.str());
}
storage_->update_metadata(metadata_);
// Re-register all topics since we rolled-over to a new bagfile.
for (const auto & topic : topics_names_to_info_) {
auto const & md = topic_names_to_message_definitions_[topic.first];
Expand Down
34 changes: 33 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
serialization.serialize_message(&test_msg, &serialized_msg);

auto rosbag_directory = rcpputils::fs::path("test_rosbag2_writer_api_bag");
auto rosbag_directory_next = rcpputils::fs::path("test_rosbag2_writer_api_bag_next");
// in case the bag was previously not cleaned up
rcpputils::fs::remove_all(rosbag_directory);
rcpputils::fs::remove_all(rosbag_directory_next);

{
rosbag2_cpp::Writer writer;
Expand Down Expand Up @@ -97,7 +99,18 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
// writing a non-serialized message
writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now());

// close on scope exit
// close as prompted
writer.close();

// open a new bag with the same writer
writer.open(rosbag_directory_next.string());

// write same topic to different bag
writer.write(
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// close by scope
}

{
Expand All @@ -124,6 +137,24 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
// close on scope exit
}

{
rosbag2_cpp::Reader reader;
std::string topic;
reader.open(rosbag_directory_next.string());
ASSERT_TRUE(reader.has_next());

auto bag_message = reader.read_next();
topic = bag_message->topic_name;

TestMsgT extracted_test_msg;
rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data);
serialization.deserialize_message(
&extracted_serialized_msg, &extracted_test_msg);

EXPECT_EQ(test_msg, extracted_test_msg);
EXPECT_EQ("/yet/another/topic", topic);
}

// alternative reader
{
rosbag2_cpp::Reader reader;
Expand All @@ -138,6 +169,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)

// remove the rosbag again after the test
EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory));
EXPECT_TRUE(rcpputils::fs::remove_all(rosbag_directory_next));
}

INSTANTIATE_TEST_SUITE_P(
Expand Down

0 comments on commit a360d9b

Please sign in to comment.