Skip to content

Commit

Permalink
Fix alter cleanup after restart.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Oct 8, 2024
1 parent 1442369 commit a7072e0
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 60 deletions.
10 changes: 7 additions & 3 deletions python/restart_test/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def part2(infinity_obj):
("test/data/config/restart_test/test_alter/2.toml", 2, True),
],
)
def test_alter_2(
def test_alter_complex(
self, infinity_runner: InfinityRunner, config: str, sleep: int, flush_mid: bool
):
table_name = "test_alter2"
Expand All @@ -97,7 +97,7 @@ def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(
table_name,
table_name, buffer_mgr_->RemoveClean();
{
"c1": {"type": "int"},
"c2": {"type": "int"},
Expand Down Expand Up @@ -158,11 +158,15 @@ def part2(infinity_obj):
}
),
)
dropped_column_dirs = pathlib.Path("/var/infinity/data").rglob("1.col")
assert len(list(dropped_column_dirs)) == 0

db_obj.drop_table(table_name)

part1()
part2()

def test_alter_cleanup(self, infinity_runner: InfinityRunner):
def test_alter_cleanup_simple(self, infinity_runner: InfinityRunner):
table_name = "test_alter3"
config = "test/data/config/restart_test/test_alter/3.toml"

Expand Down
6 changes: 5 additions & 1 deletion src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ void BufferObj::SubObjRc() {
if (obj_rc_ == 0) {
UnrecoverableError(fmt::format("SubObjRc: obj_rc_ is 0, buffer: {}", GetFilename()));
}
obj_rc_--;
--obj_rc_;
if (obj_rc_ == 0) {
status_ = BufferStatus::kClean;
buffer_mgr_->AddToCleanList(this, false/*do_free*/);
}
}

void BufferObj::CheckState() const {
Expand Down
8 changes: 1 addition & 7 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -793,13 +793,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto *segment_entry = table_entry->segment_map_.at(segment_id).get();
auto *block_entry = segment_entry->GetBlockEntryByID(block_id).get();
if (merge_flag == MergeFlag::kDelete) {
block_entry->AddColumnReplay(BlockColumnEntry::ReplayDropBlockColumnEntry(block_entry,
column_id,
buffer_mgr,
next_outline_idx,
last_chunk_offset,
commit_ts),
column_id);
block_entry->DropColumnReplay(column_id);
} else if (merge_flag == MergeFlag::kNew) {
block_entry->AddColumnReplay(BlockColumnEntry::NewReplayBlockColumnEntry(block_entry,
column_id,
Expand Down
37 changes: 0 additions & 37 deletions src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,43 +150,6 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::NewReplayBlockColumnEntry(const Bl
return column_entry;
}

UniquePtr<BlockColumnEntry> BlockColumnEntry::ReplayDropBlockColumnEntry(const BlockEntry *block_entry,
ColumnID column_id,
BufferManager *buffer_manager,
const u32 next_outline_idx,
const u64 last_chunk_offset,
const TxnTimeStamp commit_ts) {
UniquePtr<BlockColumnEntry> column_entry = MakeUnique<BlockColumnEntry>(block_entry, column_id);
column_entry->file_name_ = MakeShared<String>(std::to_string(column_id) + ".col");
column_entry->column_type_ = nullptr;
column_entry->commit_ts_ = commit_ts;
column_entry->deleted_ = true;

auto file_worker = MakeUnique<DataFileWorker>(MakeShared<String>(InfinityContext::instance().config()->DataDir()),
MakeShared<String>(InfinityContext::instance().config()->TempDir()),
block_entry->block_dir(),
column_entry->file_name_,
0, /*total_data_size*/
buffer_manager->persistence_manager());

column_entry->buffer_ = buffer_manager->GetBufferObject(std::move(file_worker), true /*restart*/);

if (next_outline_idx > 0) {
SizeT buffer_size = last_chunk_offset;
auto outline_buffer_file_worker = MakeUnique<VarFileWorker>(MakeShared<String>(InfinityContext::instance().config()->DataDir()),
MakeShared<String>(InfinityContext::instance().config()->TempDir()),
block_entry->block_dir(),
column_entry->OutlineFilename(0),
buffer_size,
buffer_manager->persistence_manager());
auto *buffer_obj = buffer_manager->GetBufferObject(std::move(outline_buffer_file_worker), true /*restart*/);
column_entry->outline_buffers_.push_back(buffer_obj);
}
column_entry->last_chunk_offset_ = last_chunk_offset;

return column_entry;
}

String BlockColumnEntry::FilePath() const { return Path(*block_entry_->block_dir()) / *file_name_; }

SharedPtr<String> BlockColumnEntry::FileDir() const { return block_entry_->block_dir(); }
Expand Down
7 changes: 0 additions & 7 deletions src/storage/meta/entry/block_column_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ public:
const u64 last_chunk_offset,
const TxnTimeStamp commit_ts);

static UniquePtr<BlockColumnEntry> ReplayDropBlockColumnEntry(const BlockEntry *block_entry,
ColumnID column_id,
BufferManager *buffer_manager,
const u32 next_outline_idx,
const u64 last_chunk_offset,
const TxnTimeStamp commit_ts);

nlohmann::json Serialize();

static UniquePtr<BlockColumnEntry> Deserialize(const nlohmann::json &column_data_json, BlockEntry *block_entry, BufferManager *buffer_mgr);
Expand Down
17 changes: 12 additions & 5 deletions src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,19 @@ void BlockEntry::AddColumnReplay(UniquePtr<BlockColumnEntry> column_entry, Colum
if (iter == columns_.end()) {
columns_.emplace_back(std::move(column_entry));
} else {
if (!column_entry->Deleted()) {
UnrecoverableError(fmt::format("BlockEntry::AddColumnReplay: column {} already exists", column_id));
}
dropped_columns_.emplace_back(std::move(column_entry));
columns_.erase(iter);
*iter = std::move(column_entry);
}
}

void BlockEntry::DropColumnReplay(ColumnID column_id) {
auto iter = std::find_if(columns_.begin(), columns_.end(), [&](const auto &column) { return column->column_id() == column_id; });
if (iter == columns_.end()) {
String error_message = fmt::format("BlockEntry::AddColumnReplay: column_id {} not found", column_id);
UnrecoverableError(error_message);
}
BlockColumnEntry *entry = iter->get();
entry->DropColumn();
columns_.erase(iter);
}

void BlockEntry::AppendBlock(const Vector<ColumnVector> &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/meta/entry/block_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public:

void AddColumnReplay(UniquePtr<BlockColumnEntry> column_entry, ColumnID column_id);

void DropColumnReplay(ColumnID column_id);

void AppendBlock(const Vector<ColumnVector> &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr);

void Cleanup(CleanupInfoTracer *info_tracer = nullptr, bool dropped = true) override;
Expand Down

0 comments on commit a7072e0

Please sign in to comment.