Skip to content

Commit

Permalink
Introduce max stream count when schedule a graph.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliu committed Jun 30, 2023
1 parent 2901042 commit ee4b1d1
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 84 deletions.
1 change: 1 addition & 0 deletions lib/nnc/_ccv_cnnp_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ struct ccv_cnnp_model_s {
const ccv_cnnp_model_vtab_t* isa;
int input_size; // This is the best effort number, mostly just for subclass to use.
int output_size;
int max_stream_count;
ccv_array_t* io; // The opaque io that can be nil.
ccv_array_t* parameter_indices; // The indexes for parameters in the final model.
ccv_nnc_symbolic_graph_t* graph;
Expand Down
1 change: 1 addition & 0 deletions lib/nnc/_ccv_nnc_dynamic_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ KHASH_MAP_INIT_INT(stateful_exec, ccv_nnc_stateful_exec_t*)

struct ccv_nnc_dynamic_graph_s {
int no_grad; // 1 if gradient computation is disabled.
int max_stream_count;
int reuse_var; // -1 if no var can be reused. Otherwise first locate the reuse var without increase array size.
int reuse_stateful_exec; // -1 if no stateful exec can be reused. Otherwise first locate the reuse without increase array size.
ccv_nnc_xpu_alloc_t xpu_alloc; // Allocate memory dynamically.
Expand Down
22 changes: 15 additions & 7 deletions lib/nnc/ccv_cnnp_model.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,14 @@ void ccv_cnnp_model_set_data_parallel(ccv_cnnp_model_t* const model, const int p
{ assert(!compiled_data->graph); }
}

void ccv_cnnp_model_set_max_concurrency(ccv_cnnp_model_t* const model, const int max_stream_count)
{
model->max_stream_count = max_stream_count;
ccv_cnnp_compiled_data_t* const compiled_data = model->compiled_data;
if (compiled_data)
{ assert(!compiled_data->graph); }
}

void ccv_cnnp_model_set_memory_compression(ccv_cnnp_model_t* const model, const int memory_compression)
{
model->memory_compression = memory_compression;
Expand Down Expand Up @@ -1380,7 +1388,7 @@ static void _ccv_cnnp_model_fit_jit(ccv_cnnp_model_t* const model, ccv_nnc_tenso
if (to.graph)
compiled_data->evaluate.to_ops[compiled_data->evaluate.to_op_size++] = to;
}
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type);
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count);
ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL);
}

Expand Down Expand Up @@ -1533,7 +1541,7 @@ static void _ccv_cnnp_model_multistage_no_grad_jit(ccv_cnnp_model_t* const model
.graph_exec_arena = compiled_data->graph_exec_arena,
};
ccv_cnnp_model_set_is_test(model, 1, _ccv_cnnp_cmd_update_for_execs, &update);
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type);
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count);
ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL);
}

Expand Down Expand Up @@ -1676,7 +1684,7 @@ static void _ccv_cnnp_model_multistage_jit_0(ccv_cnnp_model_t* const model, cons
.graph = compiled_data->graph,
};
ccv_array_free(backward_from);
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type);
ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count);
ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL);
}

Expand Down Expand Up @@ -1722,7 +1730,7 @@ void ccv_cnnp_model_evaluate(ccv_cnnp_model_t* const model, const ccv_cnnp_evalu
ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, 0, tensor_tape, stream_context);
else {
if (!compiled_data->evaluate.schedule)
compiled_data->evaluate.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, 0, 0, compiled_data->evaluate.to_ops, compiled_data->evaluate.to_op_size);
compiled_data->evaluate.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, model->max_stream_count, 0, 0, compiled_data->evaluate.to_ops, compiled_data->evaluate.to_op_size);
ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, compiled_data->evaluate.schedule, tensor_tape, stream_context);
}
}
Expand Down Expand Up @@ -1761,7 +1769,7 @@ static void _ccv_cnnp_model_multistage_jit_1(ccv_cnnp_model_t* const model)
_ccv_cnnp_model_bind_tensors(accum, compiled_data->backward.updated_accum_gradients, compiled_data->tensors.accum_gradients, parameter_size * parallel_count, 1, tensor_binds);
ccv_nnc_symbolic_graph_compile(accum, compiled_data->compile_params, (ccv_nnc_tensor_bind_t*)ccv_array_get(tensor_binds, 0), tensor_binds->rnum, 0, 0, SYMBOLIC_GRAPH_SOURCES(accum), SYMBOLIC_GRAPH_DESTINATIONS(accum), &compiled_data->backward.accum, &compiled_data->backward.tensor_arena, &compiled_data->backward.graph_exec_arena);
ccv_nnc_symbolic_graph_free(accum);
ccv_nnc_graph_set_default_static_schedule(compiled_data->backward.accum, compiled_data->stream_type);
ccv_nnc_graph_set_default_static_schedule(compiled_data->backward.accum, compiled_data->stream_type, model->max_stream_count);
ccv_array_free(tensor_binds);
}

Expand Down Expand Up @@ -1843,7 +1851,7 @@ void ccv_cnnp_model_backward(ccv_cnnp_model_t* const model, ccv_nnc_tensor_t* co
// parameters and internals. The same cannot be said for gradients due to the accum_gradients switching.
_ccv_cnnp_bind_tensors_to_arena(compiled_data->tensor_arena, model->graph, compiled_data->gradients, compiled_data->tensors.gradients, parameter_size, parallel_count);
if (!compiled_data->backward.schedule)
compiled_data->backward.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, compiled_data->backward.from_ops, compiled_data->backward.from_op_size, 0, 0);
compiled_data->backward.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, model->max_stream_count, compiled_data->backward.from_ops, compiled_data->backward.from_op_size, 0, 0);
// Run the backward pass.
ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, compiled_data->backward.schedule, tensor_tape, stream_context);
// If we need to run accumulation round, do that now.
Expand Down Expand Up @@ -1934,7 +1942,7 @@ static void _ccv_cnnp_model_multistage_jit_2(ccv_cnnp_model_t* const model)
ccv_nnc_cmd_exec(CMD_SET_FORWARD(0), ccv_nnc_no_hint, 0, 0, 0, &copy, 1, 0);
}
}
ccv_nnc_graph_set_default_static_schedule(compiled_data->apply_gradients.graph, compiled_data->stream_type);
ccv_nnc_graph_set_default_static_schedule(compiled_data->apply_gradients.graph, compiled_data->stream_type, model->max_stream_count);
}

void ccv_cnnp_model_apply_gradients(ccv_cnnp_model_t* const model, ccv_nnc_stream_context_t* const stream_context)
Expand Down
20 changes: 18 additions & 2 deletions lib/nnc/ccv_nnc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1166,21 +1166,23 @@ typedef struct ccv_nnc_graph_static_schedule_s ccv_nnc_graph_static_schedule_t;
* and save the end result to a internal schedule object to this graph.
* @param graph The concrete graph.
* @param stream_type The type of stream context we are going to use.
* @param max_stream_count The number of stream contexts to be allocated internally.
*/
void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type);
void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count);
/**
* Allocate extra streams to make this graph parallel runnable. Note this requires the graph to be topsorted.
* After this is done, you can schedule a graph either on its default stream, or a new stream with the schedule
* object.
* @param graph The concrete graph.
* @param stream_type The type of stream context we are going to use.
* @param max_stream_count The number of stream contexts to be allocated internally.
* @param sources The source execution nodes to begin. 0 uses default sources.
* @param source_size The size of source execution nodes.
* @param destinations The destination execution nodes which we end. 0 uses default destinations.
* @param destination_size The size of destination execution nodes.
* @return An opaque schedule object that let the graph knows how to run itself efficiently.
*/
CCV_WARN_UNUSED(ccv_nnc_graph_static_schedule_t*) ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size);
CCV_WARN_UNUSED(ccv_nnc_graph_static_schedule_t*) ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size);
/**
* Free a schedule object for a graph.
* @param schedule The schedule object returned from ccv_nnc_graph_static_schedule_new.
Expand Down Expand Up @@ -2844,6 +2846,13 @@ typedef struct ccv_cnnp_model_s ccv_cnnp_model_t;
* @param stream_context Which stream this computation will be executed upon.
*/
void ccv_nnc_dynamic_graph_evaluate(ccv_nnc_dynamic_graph_t* const dynamic_graph, ccv_cnnp_model_t* const model, const int is_test, const ccv_nnc_tensor_variable_t* const inputs, const int input_size, ccv_nnc_tensor_variable_t* const outputs, const int output_size, ccv_nnc_tensor_tape_t* const tensor_tape, ccv_nnc_stream_context_t* const stream_context);
/**
* Set the maximum operator-level concurrency. This is a soft-limit, e.g. if you have operations on
* different devices, they are concurrent.
* @param graph The dynamic graph.
* @param max_stream_count The maximum concurrency if the dynamic graph schedules internal streams. 0 is no limit.
*/
void ccv_nnc_dynamic_graph_set_max_concurrency(ccv_nnc_dynamic_graph_t* const graph, const int max_stream_count);
/**
* Enable or disable gradient computation on a dynamic graph.
* @param dynamic_graph The dynamic graph.
Expand Down Expand Up @@ -3731,6 +3740,13 @@ int ccv_cnnp_model_read(void* const handle, const char* const name, const ccv_nn
* @param parallel Number of devices we want to run on. 0 will use all devices available. 1 will skip.
*/
void ccv_cnnp_model_set_data_parallel(ccv_cnnp_model_t* const model, const int parallel);
/**
* Set the maximum operator-level concurrency. This is a soft-limit, e.g. if you have operations on
* different devices, they are concurrent.
* @param model The composed model.
* @param max_stream_count The maximum concurrency if the model schedules internal streams. 0 is no limit.
*/
void ccv_cnnp_model_set_max_concurrency(ccv_cnnp_model_t* const model, const int max_stream_count);
/**
* Apply memory compression to the composed model. The memory compression technique can reduce memory
* usage up to 75% comparing with raw mix-precision model during training time.
Expand Down
5 changes: 5 additions & 0 deletions lib/nnc/ccv_nnc_dynamic_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ ccv_nnc_tensor_variable_t ccv_nnc_tensor_variable_exchange_new(ccv_nnc_dynamic_g
return new_variable;
}

void ccv_nnc_dynamic_graph_set_max_concurrency(ccv_nnc_dynamic_graph_t* const dynamic_graph, const int max_stream_count)
{
dynamic_graph->max_stream_count = max_stream_count;
}

int ccv_nnc_dynamic_graph_set_no_grad(ccv_nnc_dynamic_graph_t* const dynamic_graph, const int no_grad)
{
if (dynamic_graph->no_grad == no_grad)
Expand Down
4 changes: 2 additions & 2 deletions lib/nnc/ccv_nnc_dynamic_graph_apply_gradients.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void ccv_nnc_dynamic_graph_apply_gradients(ccv_nnc_dynamic_graph_t* const dynami
ccv_array_free(symbol_stack);
if (stream_context)
{
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context));
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context);
ccv_nnc_tensor_arena_buffer_free(tensor_arena);
ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena);
Expand All @@ -228,7 +228,7 @@ void ccv_nnc_dynamic_graph_apply_gradients(ccv_nnc_dynamic_graph_t* const dynami
for (i = 0; !flag && i < parameter_size; i++)
flag = (CCV_TENSOR_GET_MEMORY(parameters[i]->info.type) == CCV_TENSOR_GPU_MEMORY);
const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU;
ccv_nnc_graph_set_default_static_schedule(graph, stream_type);
ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count);
ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream);
ccv_nnc_stream_context_wait(default_stream);
Expand Down
4 changes: 2 additions & 2 deletions lib/nnc/ccv_nnc_dynamic_graph_backward.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ void ccv_nnc_dynamic_graph_backward(ccv_nnc_dynamic_graph_t* const dynamic_graph
multi_device = (CCV_TENSOR_GET_DEVICE(inputs[i - 1]->info.type) != CCV_TENSOR_GET_DEVICE(inputs[i]->info.type));
if (stream_context)
{
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context));
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context);
ccv_nnc_tensor_arena_buffer_free(tensor_arena);
ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena);
Expand All @@ -538,7 +538,7 @@ void ccv_nnc_dynamic_graph_backward(ccv_nnc_dynamic_graph_t* const dynamic_graph
for (i = 0; !flag && i < input_size; i++)
flag = (CCV_TENSOR_GET_MEMORY(inputs[i]->info.type) == CCV_TENSOR_GPU_MEMORY);
const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU;
ccv_nnc_graph_set_default_static_schedule(graph, stream_type);
ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count);
ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream);
ccv_nnc_stream_context_wait(default_stream);
Expand Down
4 changes: 2 additions & 2 deletions lib/nnc/ccv_nnc_dynamic_graph_minimize.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ void ccv_nnc_dynamic_graph_minimize(ccv_nnc_dynamic_graph_t* const dynamic_graph
ccv_array_free(symbol_stack);
if (stream_context)
{
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context));
ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context);
ccv_nnc_tensor_arena_buffer_free(tensor_arena);
ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena);
Expand All @@ -466,7 +466,7 @@ void ccv_nnc_dynamic_graph_minimize(ccv_nnc_dynamic_graph_t* const dynamic_graph
for (i = 0; !flag && i < parameter_size; i++)
flag = (CCV_TENSOR_GET_MEMORY(parameters[i]->info.type) == CCV_TENSOR_GPU_MEMORY);
const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU;
ccv_nnc_graph_set_default_static_schedule(graph, stream_type);
ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count);
ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph);
ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream);
ccv_nnc_stream_context_wait(default_stream);
Expand Down
Loading

0 comments on commit ee4b1d1

Please sign in to comment.