From ee4b1d1dcf23d4067c631d8405be907670e07fc6 Mon Sep 17 00:00:00 2001 From: Liu Liu Date: Thu, 29 Jun 2023 23:51:12 -0400 Subject: [PATCH] Introduce max stream count when schedule a graph. --- lib/nnc/_ccv_cnnp_model.h | 1 + lib/nnc/_ccv_nnc_dynamic_graph.h | 1 + lib/nnc/ccv_cnnp_model.c | 22 ++- lib/nnc/ccv_nnc.h | 20 ++- lib/nnc/ccv_nnc_dynamic_graph.c | 5 + .../ccv_nnc_dynamic_graph_apply_gradients.c | 4 +- lib/nnc/ccv_nnc_dynamic_graph_backward.c | 4 +- lib/nnc/ccv_nnc_dynamic_graph_minimize.c | 4 +- lib/nnc/ccv_nnc_graph.c | 144 +++++++++++------- test/int/nnc/parallel.tests.c | 4 +- test/int/nnc/schedule.tests.c | 22 +-- test/unit/nnc/autograd.tests.c | 2 +- test/unit/nnc/parallel.tests.c | 2 +- 13 files changed, 151 insertions(+), 84 deletions(-) diff --git a/lib/nnc/_ccv_cnnp_model.h b/lib/nnc/_ccv_cnnp_model.h index 704761ef6..5a99a7a15 100644 --- a/lib/nnc/_ccv_cnnp_model.h +++ b/lib/nnc/_ccv_cnnp_model.h @@ -146,6 +146,7 @@ struct ccv_cnnp_model_s { const ccv_cnnp_model_vtab_t* isa; int input_size; // This is the best effort number, mostly just for subclass to use. int output_size; + int max_stream_count; ccv_array_t* io; // The opaque io that can be nil. ccv_array_t* parameter_indices; // The indexes for parameters in the final model. ccv_nnc_symbolic_graph_t* graph; diff --git a/lib/nnc/_ccv_nnc_dynamic_graph.h b/lib/nnc/_ccv_nnc_dynamic_graph.h index 08ea6a7e8..d514e2baf 100644 --- a/lib/nnc/_ccv_nnc_dynamic_graph.h +++ b/lib/nnc/_ccv_nnc_dynamic_graph.h @@ -84,6 +84,7 @@ KHASH_MAP_INIT_INT(stateful_exec, ccv_nnc_stateful_exec_t*) struct ccv_nnc_dynamic_graph_s { int no_grad; // 1 if gradient computation is disabled. + int max_stream_count; int reuse_var; // -1 if no var can be reused. Otherwise first locate the reuse var without increase array size. int reuse_stateful_exec; // -1 if no stateful exec can be reused. Otherwise first locate the reuse without increase array size. ccv_nnc_xpu_alloc_t xpu_alloc; // Allocate memory dynamically. diff --git a/lib/nnc/ccv_cnnp_model.c b/lib/nnc/ccv_cnnp_model.c index 99c558b40..74666c36f 100644 --- a/lib/nnc/ccv_cnnp_model.c +++ b/lib/nnc/ccv_cnnp_model.c @@ -622,6 +622,14 @@ void ccv_cnnp_model_set_data_parallel(ccv_cnnp_model_t* const model, const int p { assert(!compiled_data->graph); } } +void ccv_cnnp_model_set_max_concurrency(ccv_cnnp_model_t* const model, const int max_stream_count) +{ + model->max_stream_count = max_stream_count; + ccv_cnnp_compiled_data_t* const compiled_data = model->compiled_data; + if (compiled_data) + { assert(!compiled_data->graph); } +} + void ccv_cnnp_model_set_memory_compression(ccv_cnnp_model_t* const model, const int memory_compression) { model->memory_compression = memory_compression; @@ -1380,7 +1388,7 @@ static void _ccv_cnnp_model_fit_jit(ccv_cnnp_model_t* const model, ccv_nnc_tenso if (to.graph) compiled_data->evaluate.to_ops[compiled_data->evaluate.to_op_size++] = to; } - ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type); + ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count); ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL); } @@ -1533,7 +1541,7 @@ static void _ccv_cnnp_model_multistage_no_grad_jit(ccv_cnnp_model_t* const model .graph_exec_arena = compiled_data->graph_exec_arena, }; ccv_cnnp_model_set_is_test(model, 1, _ccv_cnnp_cmd_update_for_execs, &update); - ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type); + ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count); ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL); } @@ -1676,7 +1684,7 @@ static void _ccv_cnnp_model_multistage_jit_0(ccv_cnnp_model_t* const model, cons .graph = compiled_data->graph, }; ccv_array_free(backward_from); - ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type); + ccv_nnc_graph_set_default_static_schedule(compiled_data->graph, compiled_data->stream_type, model->max_stream_count); ccv_nnc_graph_autotune(compiled_data->graph, model->workspace_size, 0, TRAVERSE_FULL); } @@ -1722,7 +1730,7 @@ void ccv_cnnp_model_evaluate(ccv_cnnp_model_t* const model, const ccv_cnnp_evalu ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, 0, tensor_tape, stream_context); else { if (!compiled_data->evaluate.schedule) - compiled_data->evaluate.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, 0, 0, compiled_data->evaluate.to_ops, compiled_data->evaluate.to_op_size); + compiled_data->evaluate.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, model->max_stream_count, 0, 0, compiled_data->evaluate.to_ops, compiled_data->evaluate.to_op_size); ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, compiled_data->evaluate.schedule, tensor_tape, stream_context); } } @@ -1761,7 +1769,7 @@ static void _ccv_cnnp_model_multistage_jit_1(ccv_cnnp_model_t* const model) _ccv_cnnp_model_bind_tensors(accum, compiled_data->backward.updated_accum_gradients, compiled_data->tensors.accum_gradients, parameter_size * parallel_count, 1, tensor_binds); ccv_nnc_symbolic_graph_compile(accum, compiled_data->compile_params, (ccv_nnc_tensor_bind_t*)ccv_array_get(tensor_binds, 0), tensor_binds->rnum, 0, 0, SYMBOLIC_GRAPH_SOURCES(accum), SYMBOLIC_GRAPH_DESTINATIONS(accum), &compiled_data->backward.accum, &compiled_data->backward.tensor_arena, &compiled_data->backward.graph_exec_arena); ccv_nnc_symbolic_graph_free(accum); - ccv_nnc_graph_set_default_static_schedule(compiled_data->backward.accum, compiled_data->stream_type); + ccv_nnc_graph_set_default_static_schedule(compiled_data->backward.accum, compiled_data->stream_type, model->max_stream_count); ccv_array_free(tensor_binds); } @@ -1843,7 +1851,7 @@ void ccv_cnnp_model_backward(ccv_cnnp_model_t* const model, ccv_nnc_tensor_t* co // parameters and internals. The same cannot be said for gradients due to the accum_gradients switching. _ccv_cnnp_bind_tensors_to_arena(compiled_data->tensor_arena, model->graph, compiled_data->gradients, compiled_data->tensors.gradients, parameter_size, parallel_count); if (!compiled_data->backward.schedule) - compiled_data->backward.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, compiled_data->backward.from_ops, compiled_data->backward.from_op_size, 0, 0); + compiled_data->backward.schedule = ccv_nnc_graph_static_schedule_new(compiled_data->graph, compiled_data->stream_type, model->max_stream_count, compiled_data->backward.from_ops, compiled_data->backward.from_op_size, 0, 0); // Run the backward pass. ccv_nnc_graph_run_with_schedule(compiled_data->graph, 0, compiled_data->backward.schedule, tensor_tape, stream_context); // If we need to run accumulation round, do that now. @@ -1934,7 +1942,7 @@ static void _ccv_cnnp_model_multistage_jit_2(ccv_cnnp_model_t* const model) ccv_nnc_cmd_exec(CMD_SET_FORWARD(0), ccv_nnc_no_hint, 0, 0, 0, ©, 1, 0); } } - ccv_nnc_graph_set_default_static_schedule(compiled_data->apply_gradients.graph, compiled_data->stream_type); + ccv_nnc_graph_set_default_static_schedule(compiled_data->apply_gradients.graph, compiled_data->stream_type, model->max_stream_count); } void ccv_cnnp_model_apply_gradients(ccv_cnnp_model_t* const model, ccv_nnc_stream_context_t* const stream_context) diff --git a/lib/nnc/ccv_nnc.h b/lib/nnc/ccv_nnc.h index 1286061ae..6d72e836a 100644 --- a/lib/nnc/ccv_nnc.h +++ b/lib/nnc/ccv_nnc.h @@ -1166,21 +1166,23 @@ typedef struct ccv_nnc_graph_static_schedule_s ccv_nnc_graph_static_schedule_t; * and save the end result to a internal schedule object to this graph. * @param graph The concrete graph. * @param stream_type The type of stream context we are going to use. + * @param max_stream_count The number of stream contexts to be allocated internally. */ -void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type); +void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count); /** * Allocate extra streams to make this graph parallel runnable. Note this requires the graph to be topsorted. * After this is done, you can schedule a graph either on its default stream, or a new stream with the schedule * object. * @param graph The concrete graph. * @param stream_type The type of stream context we are going to use. + * @param max_stream_count The number of stream contexts to be allocated internally. * @param sources The source execution nodes to begin. 0 uses default sources. * @param source_size The size of source execution nodes. * @param destinations The destination execution nodes which we end. 0 uses default destinations. * @param destination_size The size of destination execution nodes. * @return An opaque schedule object that let the graph knows how to run itself efficiently. */ -CCV_WARN_UNUSED(ccv_nnc_graph_static_schedule_t*) ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size); +CCV_WARN_UNUSED(ccv_nnc_graph_static_schedule_t*) ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size); /** * Free a schedule object for a graph. * @param schedule The schedule object returned from ccv_nnc_graph_static_schedule_new. @@ -2844,6 +2846,13 @@ typedef struct ccv_cnnp_model_s ccv_cnnp_model_t; * @param stream_context Which stream this computation will be executed upon. */ void ccv_nnc_dynamic_graph_evaluate(ccv_nnc_dynamic_graph_t* const dynamic_graph, ccv_cnnp_model_t* const model, const int is_test, const ccv_nnc_tensor_variable_t* const inputs, const int input_size, ccv_nnc_tensor_variable_t* const outputs, const int output_size, ccv_nnc_tensor_tape_t* const tensor_tape, ccv_nnc_stream_context_t* const stream_context); +/** + * Set the maximum operator-level concurrency. This is a soft-limit, e.g. if you have operations on + * different devices, they are concurrent. + * @param graph The dynamic graph. + * @param max_stream_count The maximum concurrency if the dynamic graph schedules internal streams. 0 is no limit. + */ +void ccv_nnc_dynamic_graph_set_max_concurrency(ccv_nnc_dynamic_graph_t* const graph, const int max_stream_count); /** * Enable or disable gradient computation on a dynamic graph. * @param dynamic_graph The dynamic graph. @@ -3731,6 +3740,13 @@ int ccv_cnnp_model_read(void* const handle, const char* const name, const ccv_nn * @param parallel Number of devices we want to run on. 0 will use all devices available. 1 will skip. */ void ccv_cnnp_model_set_data_parallel(ccv_cnnp_model_t* const model, const int parallel); +/** + * Set the maximum operator-level concurrency. This is a soft-limit, e.g. if you have operations on + * different devices, they are concurrent. + * @param model The composed model. + * @param max_stream_count The maximum concurrency if the model schedules internal streams. 0 is no limit. + */ +void ccv_cnnp_model_set_max_concurrency(ccv_cnnp_model_t* const model, const int max_stream_count); /** * Apply memory compression to the composed model. The memory compression technique can reduce memory * usage up to 75% comparing with raw mix-precision model during training time. diff --git a/lib/nnc/ccv_nnc_dynamic_graph.c b/lib/nnc/ccv_nnc_dynamic_graph.c index ba04962d7..07c85f4e3 100644 --- a/lib/nnc/ccv_nnc_dynamic_graph.c +++ b/lib/nnc/ccv_nnc_dynamic_graph.c @@ -448,6 +448,11 @@ ccv_nnc_tensor_variable_t ccv_nnc_tensor_variable_exchange_new(ccv_nnc_dynamic_g return new_variable; } +void ccv_nnc_dynamic_graph_set_max_concurrency(ccv_nnc_dynamic_graph_t* const dynamic_graph, const int max_stream_count) +{ + dynamic_graph->max_stream_count = max_stream_count; +} + int ccv_nnc_dynamic_graph_set_no_grad(ccv_nnc_dynamic_graph_t* const dynamic_graph, const int no_grad) { if (dynamic_graph->no_grad == no_grad) diff --git a/lib/nnc/ccv_nnc_dynamic_graph_apply_gradients.c b/lib/nnc/ccv_nnc_dynamic_graph_apply_gradients.c index c24d99ad4..a14e406e0 100644 --- a/lib/nnc/ccv_nnc_dynamic_graph_apply_gradients.c +++ b/lib/nnc/ccv_nnc_dynamic_graph_apply_gradients.c @@ -216,7 +216,7 @@ void ccv_nnc_dynamic_graph_apply_gradients(ccv_nnc_dynamic_graph_t* const dynami ccv_array_free(symbol_stack); if (stream_context) { - ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context)); + ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context); ccv_nnc_tensor_arena_buffer_free(tensor_arena); ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena); @@ -228,7 +228,7 @@ void ccv_nnc_dynamic_graph_apply_gradients(ccv_nnc_dynamic_graph_t* const dynami for (i = 0; !flag && i < parameter_size; i++) flag = (CCV_TENSOR_GET_MEMORY(parameters[i]->info.type) == CCV_TENSOR_GPU_MEMORY); const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU; - ccv_nnc_graph_set_default_static_schedule(graph, stream_type); + ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count); ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream); ccv_nnc_stream_context_wait(default_stream); diff --git a/lib/nnc/ccv_nnc_dynamic_graph_backward.c b/lib/nnc/ccv_nnc_dynamic_graph_backward.c index 34ee13e9d..179dbc8cc 100644 --- a/lib/nnc/ccv_nnc_dynamic_graph_backward.c +++ b/lib/nnc/ccv_nnc_dynamic_graph_backward.c @@ -526,7 +526,7 @@ void ccv_nnc_dynamic_graph_backward(ccv_nnc_dynamic_graph_t* const dynamic_graph multi_device = (CCV_TENSOR_GET_DEVICE(inputs[i - 1]->info.type) != CCV_TENSOR_GET_DEVICE(inputs[i]->info.type)); if (stream_context) { - ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context)); + ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context); ccv_nnc_tensor_arena_buffer_free(tensor_arena); ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena); @@ -538,7 +538,7 @@ void ccv_nnc_dynamic_graph_backward(ccv_nnc_dynamic_graph_t* const dynamic_graph for (i = 0; !flag && i < input_size; i++) flag = (CCV_TENSOR_GET_MEMORY(inputs[i]->info.type) == CCV_TENSOR_GPU_MEMORY); const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU; - ccv_nnc_graph_set_default_static_schedule(graph, stream_type); + ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count); ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream); ccv_nnc_stream_context_wait(default_stream); diff --git a/lib/nnc/ccv_nnc_dynamic_graph_minimize.c b/lib/nnc/ccv_nnc_dynamic_graph_minimize.c index bea2ab6b3..e4c6d3cb5 100644 --- a/lib/nnc/ccv_nnc_dynamic_graph_minimize.c +++ b/lib/nnc/ccv_nnc_dynamic_graph_minimize.c @@ -454,7 +454,7 @@ void ccv_nnc_dynamic_graph_minimize(ccv_nnc_dynamic_graph_t* const dynamic_graph ccv_array_free(symbol_stack); if (stream_context) { - ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context)); + ccv_nnc_graph_set_default_static_schedule(graph, ccv_nnc_stream_context_type(stream_context), dynamic_graph->max_stream_count); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, stream_context); ccv_nnc_tensor_arena_buffer_free(tensor_arena); ccv_nnc_compilation_artifact_t* const artifact = ccv_nnc_compilation_artifact_new(graph, tensor_arena, exec_arena); @@ -466,7 +466,7 @@ void ccv_nnc_dynamic_graph_minimize(ccv_nnc_dynamic_graph_t* const dynamic_graph for (i = 0; !flag && i < parameter_size; i++) flag = (CCV_TENSOR_GET_MEMORY(parameters[i]->info.type) == CCV_TENSOR_GPU_MEMORY); const int stream_type = flag ? CCV_STREAM_CONTEXT_GPU : CCV_STREAM_CONTEXT_CPU; - ccv_nnc_graph_set_default_static_schedule(graph, stream_type); + ccv_nnc_graph_set_default_static_schedule(graph, stream_type, dynamic_graph->max_stream_count); ccv_nnc_stream_context_t* const default_stream = ccv_nnc_graph_default_stream(graph); ccv_nnc_graph_run(graph, 0, TRAVERSE_FULL, 0, default_stream); ccv_nnc_stream_context_wait(default_stream); diff --git a/lib/nnc/ccv_nnc_graph.c b/lib/nnc/ccv_nnc_graph.c index 00389bb16..dd3679a8e 100644 --- a/lib/nnc/ccv_nnc_graph.c +++ b/lib/nnc/ccv_nnc_graph.c @@ -732,7 +732,7 @@ void ccv_nnc_graph_static_schedule_free(ccv_nnc_graph_static_schedule_t* const s ccfree(schedule); } -static ccv_nnc_graph_static_schedule_t* _ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const int device_id, ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_graph_exec_t* const _sources, const int _source_size, const ccv_nnc_graph_exec_t* const _destinations, const int _destination_size) +static ccv_nnc_graph_static_schedule_t* _ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const int device_id, const int max_stream_count, ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_graph_exec_t* const _sources, const int _source_size, const ccv_nnc_graph_exec_t* const _destinations, const int _destination_size) { assert(graph->sources && graph->sources->rnum); assert(graph->destinations && graph->destinations->rnum); @@ -907,11 +907,43 @@ static ccv_nnc_graph_static_schedule_t* _ccv_nnc_graph_static_schedule_new(ccv_n } if (stream_idx < 0) { - stream_idx = stream_data->rnum; - const ccv_nnc_stream_data_t data = { - .device_id = device_ids[i], - }; - ccv_array_push(stream_data, &data); + // Note that the max stream count is a "soft" limit. Even we have different devices, our compute allocation has to be on different streams. + if (stream_data->rnum >= max_stream_count && max_stream_count > 0) + { + // If we are already at out limit, go through again to see if a stream is available, if the stream has command, and also its exec_idx is not preceding this execution. + for (j = 0; (stream_idx < 0 || !stream_has_command) && j < stream_data->rnum; j++) + { + ccv_nnc_stream_data_t* const data = (ccv_nnc_stream_data_t*)ccv_array_get(stream_data, j); + if (data->device_id == device_ids[i]) + { + const ccv_numeric_data_t cell = ccv_get_sparse_matrix_cell(exec_dep, data->exec_idx, idx); + // There must be no path from idx to exec_idx otherwise we already have stream_idx. Now we just to verify + // there is no path from exec_idx to idx as well. + if (!cell.i32 || cell.i32[0] == 0) + { + if (ccv_array_find_uint(data->command_set, node->cmd.cmd)) + stream_idx = j, stream_has_command = 1; + else if (stream_idx < 0) // Otherwise, only assign the stream idx if it is not assigned yet. + stream_idx = j; + } + } + } + if (stream_idx >= 0) + { + // Now need to mark exec_idx is after idx, so we can avoid A -> B -> A deadlock. + ccv_nnc_stream_data_t* const data = (ccv_nnc_stream_data_t*)ccv_array_get(stream_data, stream_idx); + const int32_t one = 1; + ccv_set_sparse_matrix_cell(exec_dep, idx, data->exec_idx, &one); + } + } + if (stream_idx < 0) + { + stream_idx = stream_data->rnum; + const ccv_nnc_stream_data_t data = { + .device_id = device_ids[i], + }; + ccv_array_push(stream_data, &data); + } } assert(stream_idx >= 0); ccv_nnc_stream_data_t* const data = (ccv_nnc_stream_data_t*)ccv_array_get(stream_data, stream_idx); @@ -921,56 +953,60 @@ static ccv_nnc_graph_static_schedule_t* _ccv_nnc_graph_static_schedule_new(ccv_n ccv_array_add_unique_uint(data->command_set, node->cmd.cmd); // Assign all subsequent node to use this stream. int outgoing_idx = idx; - while (outgoings[outgoing_idx] && outgoings[outgoing_idx]->rnum) - { - int highest_rank = -1; - int highest_idx = -1; - int stream_n = -1; - int stream_has_command = 0; - for (j = 0; j < outgoings[outgoing_idx]->rnum; j++) + // if we want to enforce the stream count is only 1, we certainly don't want to the greedy approach. + // With the greedy approach, the current stream will go all the way down and certainly conflict with + // other streams. We'd prefer to interleaving the execution instead in this case. + if (max_stream_count != 1) + while (outgoings[outgoing_idx] && outgoings[outgoing_idx]->rnum) { - const int d = *(int*)ccv_array_get(outgoings[outgoing_idx], j); - // This is not outside of our scope at this point. - assert(schd_info[d].stream_size >= 0); - ccv_nnc_graph_exec_info_t* const outgoing_node = exec_info + d; - const int outgoing_device_id_size = _ccv_nnc_device_ids_for_stream_data(outgoing_node, device_id, stream_data, outgoing_device_ids, max_device_id_size); - if (schd_info[d].stream_size == 0) + int highest_rank = -1; + int highest_idx = -1; + int stream_n = -1; + int stream_has_command = 0; + for (j = 0; j < outgoings[outgoing_idx]->rnum; j++) { - schd_info[d].stream_size = outgoing_device_id_size; // At least at the same size as the device_id_size. - if (outgoing_device_id_size > 1) + const int d = *(int*)ccv_array_get(outgoings[outgoing_idx], j); + // This is not outside of our scope at this point. + assert(schd_info[d].stream_size >= 0); + ccv_nnc_graph_exec_info_t* const outgoing_node = exec_info + d; + const int outgoing_device_id_size = _ccv_nnc_device_ids_for_stream_data(outgoing_node, device_id, stream_data, outgoing_device_ids, max_device_id_size); + if (schd_info[d].stream_size == 0) { - schd_info[d]._heap_streams = (int*)ccmalloc(sizeof(int) * outgoing_device_id_size * 2); - schd_info[d]._heap_signals = (schd_info[d]._heap_streams + outgoing_device_id_size); + schd_info[d].stream_size = outgoing_device_id_size; // At least at the same size as the device_id_size. + if (outgoing_device_id_size > 1) + { + schd_info[d]._heap_streams = (int*)ccmalloc(sizeof(int) * outgoing_device_id_size * 2); + schd_info[d]._heap_signals = (schd_info[d]._heap_streams + outgoing_device_id_size); + } + for (k = 0; k < outgoing_device_id_size; k++) + SCHEDULE_STREAMS(schd_info[d])[k] = -1, SCHEDULE_SIGNALS(schd_info[d])[k] = -1; } + assert(schd_info[d].stream_size == outgoing_device_id_size); for (k = 0; k < outgoing_device_id_size; k++) - SCHEDULE_STREAMS(schd_info[d])[k] = -1, SCHEDULE_SIGNALS(schd_info[d])[k] = -1; + // If it should be on the same device and the stream is not assign, potentially. + if (outgoing_device_ids[k] == device_ids[i] && + SCHEDULE_STREAMS(schd_info[d])[k] < 0 && + (incomings[d].rank > highest_rank || + (incomings[d].rank == highest_rank && + !stream_has_command && ccv_array_find_uint(data->command_set, outgoing_node->cmd.cmd)))) + { + highest_rank = incomings[d].rank; + highest_idx = d; + stream_n = k; + // This is 1 if rank is the same (thus, I must break the tie already), if the rank is not the same, we need to compute this. + stream_has_command = (incomings[d].rank == highest_rank || ccv_array_find_uint(data->command_set, outgoing_node->cmd.cmd)); + } } - assert(schd_info[d].stream_size == outgoing_device_id_size); - for (k = 0; k < outgoing_device_id_size; k++) - // If it should be on the same device and the stream is not assign, potentially. - if (outgoing_device_ids[k] == device_ids[i] && - SCHEDULE_STREAMS(schd_info[d])[k] < 0 && - (incomings[d].rank > highest_rank || - (incomings[d].rank == highest_rank && - !stream_has_command && ccv_array_find_uint(data->command_set, outgoing_node->cmd.cmd)))) - { - highest_rank = incomings[d].rank; - highest_idx = d; - stream_n = k; - // This is 1 if rank is the same (thus, I must break the tie already), if the rank is not the same, we need to compute this. - stream_has_command = (incomings[d].rank == highest_rank || ccv_array_find_uint(data->command_set, outgoing_node->cmd.cmd)); - } + if (highest_idx >= 0) + { + outgoing_idx = highest_idx; + ccv_nnc_graph_exec_info_t* const outgoing_node = exec_info + outgoing_idx; + assert(stream_n >= 0); + SCHEDULE_STREAMS(schd_info[outgoing_idx])[stream_n] = stream_idx; + ccv_array_add_unique_uint(data->command_set, outgoing_node->cmd.cmd); + } else + break; } - if (highest_idx >= 0) - { - outgoing_idx = highest_idx; - ccv_nnc_graph_exec_info_t* const outgoing_node = exec_info + outgoing_idx; - assert(stream_n >= 0); - SCHEDULE_STREAMS(schd_info[outgoing_idx])[stream_n] = stream_idx; - ccv_array_add_unique_uint(data->command_set, outgoing_node->cmd.cmd); - } else - break; - } data->exec_idx = outgoing_idx; } } ccv_nnc_graph_visit_endfor @@ -1248,24 +1284,24 @@ static ccv_nnc_graph_static_schedule_t* _ccv_nnc_graph_static_schedule_new(ccv_n assert(schd_info[exec_idx].stream_size == 1); const int stream_idx = SCHEDULE_STREAMS(schd_info[exec_idx])[0]; const int device_id = ((ccv_nnc_stream_data_t*)ccv_array_get(stream_data, stream_idx))->device_id; - sub_graph->default_schedule = _ccv_nnc_graph_static_schedule_new(sub_graph, stream_type, device_id, graph->streams[stream_idx], 0, 0, 0, 0); + sub_graph->default_schedule = _ccv_nnc_graph_static_schedule_new(sub_graph, stream_type, device_id, max_stream_count, graph->streams[stream_idx], 0, 0, 0, 0); } } ccv_array_free(stream_data); return schedule; } -void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type) +void ccv_nnc_graph_set_default_static_schedule(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count) { assert(graph->p == 0); if (graph->default_schedule) ccv_nnc_graph_static_schedule_free(graph->default_schedule); - graph->default_schedule = _ccv_nnc_graph_static_schedule_new(graph, stream_type, -1, 0, 0, 0, 0, 0); + graph->default_schedule = _ccv_nnc_graph_static_schedule_new(graph, stream_type, -1, max_stream_count, 0, 0, 0, 0, 0); } -ccv_nnc_graph_static_schedule_t* ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size) +ccv_nnc_graph_static_schedule_t* ccv_nnc_graph_static_schedule_new(ccv_nnc_graph_t* const graph, const int stream_type, const int max_stream_count, const ccv_nnc_graph_exec_t* const sources, const int source_size, const ccv_nnc_graph_exec_t* const destinations, const int destination_size) { assert(graph->p == 0); - return _ccv_nnc_graph_static_schedule_new(graph, stream_type, -1, 0, sources, source_size, destinations, destination_size); + return _ccv_nnc_graph_static_schedule_new(graph, stream_type, -1, max_stream_count, 0, sources, source_size, destinations, destination_size); } ccv_nnc_stream_context_t* ccv_nnc_graph_default_stream(const ccv_nnc_graph_t* const graph) diff --git a/test/int/nnc/parallel.tests.c b/test/int/nnc/parallel.tests.c index 69389e938..ae49d605b 100644 --- a/test/int/nnc/parallel.tests.c +++ b/test/int/nnc/parallel.tests.c @@ -67,7 +67,7 @@ TEST_CASE("schedule symbolic graph to data parallel with broadcast and reduce") updated_params, 4, SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); cpu_inputs[0] = ccv_nnc_tensor_new(0, CPU_TENSOR_NCHW(32F, 16, 3, 32, 32), 0); cpu_inputs[1] = ccv_nnc_tensor_new(0, CPU_TENSOR_NCHW(32F, 16, 3, 32, 32), 0); @@ -246,7 +246,7 @@ TEST_CASE("schedule symbolic graph to data parallel with allreduce") updated_params, 4, SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); cpu_inputs[0] = ccv_nnc_tensor_new(0, CPU_TENSOR_NCHW(32F, 16, 3, 32, 32), 0); cpu_inputs[1] = ccv_nnc_tensor_new(0, CPU_TENSOR_NCHW(32F, 16, 3, 32, 32), 0); diff --git a/test/int/nnc/schedule.tests.c b/test/int/nnc/schedule.tests.c index c6c0f0c45..316d5f766 100644 --- a/test/int/nnc/schedule.tests.c +++ b/test/int/nnc/schedule.tests.c @@ -30,7 +30,7 @@ TEST_CASE("schedule GPU work on one stream") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const hw = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); @@ -99,7 +99,7 @@ TEST_CASE("schedule GPU work on multiple streams") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const hw1 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); @@ -215,7 +215,7 @@ TEST_CASE("schedule GPU work with while loop") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const hw1 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 2, 2), 0); @@ -323,7 +323,7 @@ TEST_CASE("schedule GPU work with case..of") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const hw = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 2, 2), 0); @@ -411,7 +411,7 @@ TEST_CASE("schedule GPU work with both while loop and case..of") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const hw1 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 2, 2), 0); @@ -540,7 +540,7 @@ TEST_CASE("partial schedule work, one for device 0 and one for device 1") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha0 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const ha1 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); @@ -592,7 +592,7 @@ TEST_CASE("partial schedule work, one for device 0 and one for device 1") TENSOR_LIST(ccv_nnc_tensor_from_symbol(tensor_arena, d1)), 0); // schedule device 0 - ccv_nnc_graph_static_schedule_t* const schedule0 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, + ccv_nnc_graph_static_schedule_t* const schedule0 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, 0, GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src0)), GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest0))); ccv_nnc_graph_run_with_schedule(graph, 0, schedule0, 0, stream); @@ -603,7 +603,7 @@ TEST_CASE("partial schedule work, one for device 0 and one for device 1") 0); REQUIRE_EQ_WITH_TOLERANCE(hd0->data.f32[0], (0.4 * 3 + 1.2 * 2 - 1) * 1.1 * 0.9, 1e-5, "result should be equal"); // schedule device 1 - ccv_nnc_graph_static_schedule_t* const schedule1 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, + ccv_nnc_graph_static_schedule_t* const schedule1 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, 0, GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src1)), GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest1))); ccv_nnc_graph_run_with_schedule(graph, 0, schedule1, 0, stream); @@ -624,7 +624,7 @@ TEST_CASE("partial schedule work, one for device 0 and one for device 1") TENSOR_LIST(ccv_nnc_tensor_from_symbol(tensor_arena, d1)), 0); // custom schedule again with both device 0 and device 1. - ccv_nnc_graph_static_schedule_t* const schedule01 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, + ccv_nnc_graph_static_schedule_t* const schedule01 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, 0, GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src0), ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src1)), GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest0), ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest1))); ccv_nnc_graph_run_with_schedule(graph, 0, schedule01, 0, stream); @@ -688,7 +688,7 @@ TEST_CASE("partial schedule on both device 0 and then join device 1") SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_GPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const ha0 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); ccv_nnc_tensor_t* const ha1 = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 1, 2), 0); @@ -720,7 +720,7 @@ TEST_CASE("partial schedule on both device 0 and then join device 1") ccv_nnc_stream_signal_t* const signal0 = ccv_nnc_stream_signal_new(CCV_STREAM_CONTEXT_GPU | CCV_COMPUTE_DEVICE_000); ccv_nnc_stream_context_t* const stream1 = ccv_nnc_stream_context_new(CCV_STREAM_CONTEXT_GPU | CCV_COMPUTE_DEVICE_001); // custom schedule again with both device 0 and device 1. - ccv_nnc_graph_static_schedule_t* const schedule01 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, + ccv_nnc_graph_static_schedule_t* const schedule01 = ccv_nnc_graph_static_schedule_new(graph, CCV_STREAM_CONTEXT_GPU, 0, GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src0), ccv_nnc_graph_exec_from_symbol(graph_exec_arena, src1)), GRAPH_EXEC_LIST(ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest0), ccv_nnc_graph_exec_from_symbol(graph_exec_arena, dest1))); ccv_nnc_graph_run_with_schedule(graph, 0, schedule01, 0, stream0); diff --git a/test/unit/nnc/autograd.tests.c b/test/unit/nnc/autograd.tests.c index a9dafd843..9a450c45d 100644 --- a/test/unit/nnc/autograd.tests.c +++ b/test/unit/nnc/autograd.tests.c @@ -171,7 +171,7 @@ TEST_CASE("partial autograd with D[y * x + Log[1 / x], y] when x = 0.84 and y = ccv_nnc_graph_exec_symbol_t dyc = ccv_nnc_graph_exec_symbol_for_backward(symbolic_graph, dy); ccv_nnc_symbolic_graph_compile(symbolic_graph, ccv_nnc_default_compile_params, 0, 0, 0, 0, GRAPH_EXEC_SYMBOL_LIST(prod, inv), GRAPH_EXEC_SYMBOL_LIST(dyc, sum), &graph, &tensor_arena, &graph_exec_arena); SYMBOLIC_GRAPH_GEN(symbolic_graph, CCV_NNC_LONG_DOT_GRAPH); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_CPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_CPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* tone = ccv_nnc_tensor_from_symbol(tensor_arena, one); tone->data.f32[0] = 1; diff --git a/test/unit/nnc/parallel.tests.c b/test/unit/nnc/parallel.tests.c index 9532a765b..99ef54ca5 100644 --- a/test/unit/nnc/parallel.tests.c +++ b/test/unit/nnc/parallel.tests.c @@ -39,7 +39,7 @@ TEST_CASE("schedule a simple graph for parallel execution") TENSOR_SYMBOL_LIST(d2), SYMBOLIC_GRAPH_SOURCES(symbolic_graph), SYMBOLIC_GRAPH_DESTINATIONS(symbolic_graph), &graph, &tensor_arena, &graph_exec_arena); - ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_CPU); + ccv_nnc_graph_set_default_static_schedule(graph, CCV_STREAM_CONTEXT_CPU, 0); GRAPH_GEN(graph, CCV_NNC_LONG_DOT_GRAPH); ccv_nnc_tensor_t* const x_tensor = ccv_nnc_tensor_from_symbol(tensor_arena, x); x_tensor->data.f32[0] = 2;