Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster Cuda Decoder #4811

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/c-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
- uses: actions/checkout@v3
- name: Install sox
run: sudo apt-get install -y sox intel-mkl
- name: Install python2
run: sudo apt-get install -y python2
- name: ccache
uses: hendrikmuhs/[email protected]
with:
Expand Down
3 changes: 2 additions & 1 deletion src/cudadecoder/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ OBJFILES = cuda-decoder.o cuda-decoder-kernels.o cuda-fst.o \
batched-threaded-nnet3-cuda-pipeline2.o \
batched-static-nnet3.o batched-static-nnet3-kernels.o \
cuda-online-pipeline-dynamic-batcher.o decodable-cumatrix.o \
cuda-pipeline-common.o lattice-postprocessor.o
cuda-pipeline-common.o lattice-postprocessor.o \
thread-pool-cia.o

LIBNAME = kaldi-cudadecoder

Expand Down
2 changes: 1 addition & 1 deletion src/cudadecoder/batched-static-nnet3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void BatchedStaticNnet3::PresetKernelParams() {
}

void BatchedStaticNnet3::Allocate() {
cudaEventCreate(&batch_slot_assignement_copy_evt_);
cudaEventCreate(&batch_slot_assignement_copy_evt_, cudaEventDisableTiming);
d_all_context_frames_.Resize(nchannels_ * total_nnet_context_, input_dim_);
d_batch_with_context_.Resize(
max_batch_size_ * input_frames_per_chunk_with_context_, input_dim_);
Expand Down
73 changes: 59 additions & 14 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,56 @@ bool BatchedThreadedNnet3CudaOnlinePipeline::TryInitCorrID(

void BatchedThreadedNnet3CudaOnlinePipeline::CompactWavesToMatrix(
const std::vector<SubVector<BaseFloat>> &wave_samples) {
for (int i = 0; i < wave_samples.size(); ++i) {
const SubVector<BaseFloat> &src = wave_samples[i];
int size = src.Dim();
n_samples_valid_[i] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = h_all_waveform_.RowData(i);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
nvtxRangePushA(__func__);

if (!batching_copy_thread_pool_) {
for (int i = 0; i < wave_samples.size(); ++i) {
const SubVector<BaseFloat> &src = wave_samples[i];
int size = src.Dim();
n_samples_valid_[i] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = h_all_waveform_.RowData(i);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
}
} else {
const size_t batch_size =
KALDI_CUDA_DECODER_DIV_ROUND_UP(wave_samples.size(),
config_.num_batching_copy_threads);

std::mutex m;
std::condition_variable cv;

std::atomic<size_t> tasks_remaining;
std::atomic_init(&tasks_remaining, KALDI_CUDA_DECODER_DIV_ROUND_UP(wave_samples.size(), batch_size));

for (size_t i = 0; i < wave_samples.size(); i += batch_size) {
auto task = [i, this, &wave_samples, &m, &cv, &tasks_remaining, &batch_size]() {
nvtxRangePush("CompactWavesToMatrix task");
for (size_t j = i; j < std::min(i + batch_size, wave_samples.size()); ++j) {
const SubVector<BaseFloat> &src = wave_samples[j];
int size = src.Dim();
n_samples_valid_[j] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = this->h_all_waveform_.RowData(j);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
}
--tasks_remaining;
if (tasks_remaining.load() == 0) {
std::lock_guard<std::mutex> lock(m);
cv.notify_one();
}
nvtxRangePop();
};
batching_copy_thread_pool_->submit(task);
}

// wait for all threads to finish
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&tasks_remaining](){ return tasks_remaining == 0; });
}
}
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ComputeGPUFeatureExtraction(
Expand All @@ -258,9 +300,11 @@ void BatchedThreadedNnet3CudaOnlinePipeline::ComputeGPUFeatureExtraction(
// CopyFromMat syncs, avoiding it
KALDI_ASSERT(d_all_waveform_.SizeInBytes() == h_all_waveform.SizeInBytes());
// Note : we could have smaller copies using the actual channels.size()
nvtxRangePushA("ComputeGPUFeatureExtractioncudaMemcpyAsync");
cudaMemcpyAsync(d_all_waveform_.Data(), h_all_waveform.Data(),
h_all_waveform.SizeInBytes(), cudaMemcpyHostToDevice,
cudaStreamPerThread);
nvtxRangePop();

KALDI_ASSERT(channels.size() == is_last_chunk.size());
KALDI_ASSERT(channels.size() == is_first_chunk.size());
Expand All @@ -285,9 +329,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::ComputeCPUFeatureExtraction(
n_compute_features_not_done_.store(channels.size());

for (size_t i = 0; i < channels.size(); ++i) {
thread_pool_->Push(
{&BatchedThreadedNnet3CudaOnlinePipeline::ComputeOneFeatureWrapper,
this, i, 0}); // second argument "0" is not used
thread_pool_->submit(std::bind(&BatchedThreadedNnet3CudaOnlinePipeline::ComputeOneFeature, this, i));
}

while (n_compute_features_not_done_.load(std::memory_order_acquire))
Expand Down Expand Up @@ -348,6 +390,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::DecodeBatch(
ListIChannelsInBatch(corr_ids, &channels_);

// Compact in h_all_waveform_ to use the main DecodeBatch version
// this is slow
CompactWavesToMatrix(wave_samples);

DecodeBatch(corr_ids, h_all_waveform_, n_samples_valid_, is_first_chunk,
Expand Down Expand Up @@ -565,16 +608,15 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunLatticeCallbacks(
// If q is not empty, it means we already have a task in the threadpool
// for that channel it is important to run those task in FIFO order if
// empty, run a new task
thread_pool_->Push(
{&BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecodingWrapper,
this, ichannel, /* ignored */ nullptr});
thread_pool_->submit(std::bind(&BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecoding, this, ichannel));
}
}
}

void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize(
const std::vector<CorrelationID> &corr_ids,
const std::vector<int> &channels, const std::vector<bool> &is_last_chunk) {
nvtxRangePushA("RunCallbacksAndFinalize");
// Reading endpoints, figuring out is_end_of_segment_
for (size_t i = 0; i < is_last_chunk.size(); ++i) {
bool endpoint_detected = false;
Expand All @@ -589,6 +631,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize(
RunBestPathCallbacks(corr_ids, channels);

RunLatticeCallbacks(corr_ids, channels, is_last_chunk);
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ListIChannelsInBatch(
Expand Down Expand Up @@ -646,7 +689,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::InitDecoding(
}

if (should_reset_decoder)
init_decoding_list_channels_.push_back((channels)[i]);
init_decoding_list_channels_.push_back(channels[i]);
}

if (!init_decoding_list_channels_.empty())
Expand All @@ -655,6 +698,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::InitDecoding(

void BatchedThreadedNnet3CudaOnlinePipeline::RunDecoder(
const std::vector<int> &channels, const std::vector<bool> &is_first_chunk) {
nvtxRangePushA("RunDecoder");
if (partial_hypotheses_) {
// We're going to have to generate the partial hypotheses
if (word_syms_ == nullptr) {
Expand Down Expand Up @@ -690,6 +734,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunDecoder(
(*end_points_)[i] = cuda_decoder_->EndpointDetected(ichannel);
}
}
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ReadParametersFromModel() {
Expand Down
42 changes: 24 additions & 18 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include "nnet3/nnet-optimize.h"
#include "online2/online-nnet2-feature-pipeline.h"

#include "cudadecoder/thread-pool-cia.h"

namespace kaldi {
namespace cuda_decoder {

Expand All @@ -66,7 +68,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
determinize_lattice(true),
num_decoder_copy_threads(2),
use_gpu_feature_extraction(true),
reset_on_endpoint(false) {}
reset_on_endpoint(false),
num_batching_copy_threads(0) {}
void Register(OptionsItf *po) {
po->Register("max-batch-size", &max_batch_size,
"The maximum execution batch size."
Expand All @@ -88,6 +91,12 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
po->Register(
"reset-on-endpoint", &reset_on_endpoint,
"Reset a decoder channel when endpoint detected. Do not close stream");
po->Register(
"batching-copy-threads", &num_batching_copy_threads,
"Number of threads to use for copying inputs on CPU into single pinned memory matrix. "
"0 means to just use the main thread. Recommend setting this to 8 because the memory "
"copy can starve the GPU of work."
);

feature_opts.Register(po);
decoder_opts.Register(po);
Expand All @@ -101,6 +110,7 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
int num_decoder_copy_threads;
bool use_gpu_feature_extraction;
bool reset_on_endpoint;
int num_batching_copy_threads;

OnlineNnet2FeaturePipelineConfig feature_opts;
CudaDecoderConfig decoder_opts;
Expand All @@ -121,6 +131,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
num_worker_threads = (num_worker_threads > 0)
? num_worker_threads
: std::thread::hardware_concurrency();

KALDI_ASSERT(num_batching_copy_threads >= 0);
}
};

Expand Down Expand Up @@ -150,9 +162,15 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
word_syms_(NULL) {
config_.compute_opts.CheckAndFixConfigs(am_nnet_->GetNnet().Modulus());
config_.CheckAndFixConfigs();
Initialize(decode_fst);
int num_worker_threads = config_.num_worker_threads;
thread_pool_ = std::make_unique<ThreadPoolLight>(num_worker_threads);
thread_pool_ = std::make_unique<futures_thread_pool>(num_worker_threads);

int num_batching_copy_threads = config_.num_batching_copy_threads;
if (num_batching_copy_threads > 0) {
batching_copy_thread_pool_ = std::make_unique<futures_thread_pool>(num_batching_copy_threads);
}

Initialize(decode_fst);
}

~BatchedThreadedNnet3CudaOnlinePipeline();
Expand Down Expand Up @@ -304,12 +322,6 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
// Used when features are computed on the host (CPU) on pool threads.
void ComputeOneFeature(int element);

static void ComputeOneFeatureWrapper(void *obj, uint64_t element,
void *ignored) {
static_cast<BatchedThreadedNnet3CudaOnlinePipeline *>(obj)
->ComputeOneFeature(element);
}

void RunNnet3(const std::vector<int> &channels,
const std::vector<BaseFloat *> &d_features,
const int feature_stride,
Expand Down Expand Up @@ -343,14 +355,6 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
// it will call the utterance's callback when done
void FinalizeDecoding(int32 ichannel);

// static wrapper for thread pool
static void FinalizeDecodingWrapper(void *obj, uint64_t ichannel64,
void *ignored) {
int32 ichannel = static_cast<int32>(ichannel64);
static_cast<BatchedThreadedNnet3CudaOnlinePipeline *>(obj)
->FinalizeDecoding(ichannel);
}

//
// Internal structs
//
Expand Down Expand Up @@ -501,7 +505,9 @@ class BatchedThreadedNnet3CudaOnlinePipeline {

// The thread pool receives data from device and post-processes it. This class
// destructor blocks until the thread pool is drained of work items.
std::unique_ptr<ThreadPoolLight> thread_pool_;
std::unique_ptr<futures_thread_pool> thread_pool_;

std::unique_ptr<futures_thread_pool> batching_copy_thread_pool_;

// The decoder owns thread(s) that reconstruct lattices transferred from the
// device in a compacted form as arrays with offsets instead of pointers.
Expand Down
4 changes: 4 additions & 0 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,17 @@ void BatchedThreadedNnet3CudaPipeline2::AcquireTasks() {

void BatchedThreadedNnet3CudaPipeline2::ComputeTasks() {
while (threads_running_) {
nvtxRangePushA("AcquireTasks");
if (current_tasks_.size() < max_batch_size_) AcquireTasks();
nvtxRangePop();
if (current_tasks_.empty()) {
// If we still have nothing to do, let's sleep a bit
Sleep(kSleepForNewTask);
continue;
}
nvtxRangePushA("BuildBatch");
BuildBatchFromCurrentTasks();
nvtxRangePop();

if (use_online_features_)
cuda_online_pipeline_.DecodeBatch(batch_corr_ids_, batch_wave_samples_,
Expand Down
2 changes: 0 additions & 2 deletions src/cudadecoder/cuda-decoder-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@
#define KALDI_CUDA_DECODER_BATCH_KERNEL_LOOP(i, n) \
for (int i = blockIdx.y; i < (n); i += gridDim.y)

#define KALDI_CUDA_DECODER_DIV_ROUND_UP(a, b) ((a + b - 1) / b)

#define KALDI_CUDA_DECODER_1D_BLOCK 256
#define KALDI_CUDA_DECODER_LARGEST_1D_BLOCK 1024
#define KALDI_CUDA_DECODER_ONE_THREAD_BLOCK 1
Expand Down
2 changes: 1 addition & 1 deletion src/cudadecoder/cuda-decoder-kernels.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ __global__ void emitting_preprocess_and_list_extra_prev_tokens_step1_kernel(
// Token index of one of the token which the lowest token.cost for that
// state
uint32_t state_best_int_cost_argmin;
GetArgFromPackedArgminUInt64(h_val.min_and_argmin_int_cost_u64, &state_best_int_cost_argmin);
GetArgFromPackedArgminUInt64(h_val.min_and_argmin_int_cost_u64, &state_best_int_cost_argmin);

// Checking if we're the representative of that state
representing_state = (main_q_idx == state_best_int_cost_argmin);
Expand Down
Loading